Skip to content

Commit a97972b

Browse files
authored
Merge pull request #259 from data-integrations/bugfix/315193699-big-query-delta-replication-fix-panw
BigQuery Delta Replication Plugin DataSet Project ID Fix
2 parents 61a20a3 + d55f585 commit a97972b

File tree

4 files changed

+14
-12
lines changed

4 files changed

+14
-12
lines changed

src/main/java/io/cdap/delta/bigquery/BigQueryTarget.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void initialize(DeltaTargetContext context) throws Exception {
109109

110110
Credentials credentials = conf.getCredentials();
111111

112-
String project = conf.getDatasetProject();
112+
String project = conf.getProject();
113113

114114
String cmekKey = context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) != null ?
115115
context.getRuntimeArguments().get(GCP_CMEK_KEY_NAME) : conf.getEncryptionKeyName();
@@ -138,17 +138,15 @@ public void initialize(DeltaTargetContext context) throws Exception {
138138
});
139139
try {
140140
long maximumExistingSequenceNumber = Failsafe.with(retryPolicy).get(() ->
141-
BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), project, conf.getDatasetName(),
142-
bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY));
141+
BigQueryUtils.getMaximumExistingSequenceNumber(context.getAllTables(), conf.getDatasetProject(),
142+
conf.getDatasetName(), bigQuery, encryptionConfig, MAX_TABLES_PER_QUERY));
143143
LOG.info("Found maximum sequence number {}", maximumExistingSequenceNumber);
144144
context.initializeSequenceNumber(maximumExistingSequenceNumber);
145145
} catch (Exception e) {
146146
throw new RuntimeException("Failed to compute the maximum sequence number among all the target tables " +
147147
"selected for replication. Please make sure that if target tables exists, " +
148148
"they should have '_sequence_num' column in them.", e);
149149
}
150-
151-
152150
}
153151

154152
@Override

src/main/java/io/cdap/delta/bigquery/BigQueryUtils.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.cloud.bigquery.BigQuery;
2020
import com.google.cloud.bigquery.BigQueryError;
2121
import com.google.cloud.bigquery.BigQueryException;
22+
import com.google.cloud.bigquery.DatasetId;
2223
import com.google.cloud.bigquery.EncryptionConfiguration;
2324
import com.google.cloud.bigquery.FieldValue;
2425
import com.google.cloud.bigquery.FieldValueList;
@@ -115,8 +116,9 @@ static long getMaximumExistingSequenceNumberPerBatch(Set<SourceTable> allTables,
115116
SourceTable table0 = allTables.stream().findFirst().get();
116117
Set<TableId> existingTableIDs = new HashSet<>();
117118
String dataset = getNormalizedDatasetName(datasetName, table0.getDatabase());
118-
if (bigQuery.getDataset(dataset) != null) {
119-
for (Table table : bigQuery.listTables(dataset).iterateAll()) {
119+
DatasetId datasetId = DatasetId.of(project, dataset);
120+
if (bigQuery.getDataset(datasetId) != null) {
121+
for (Table table : bigQuery.listTables(datasetId).iterateAll()) {
120122
existingTableIDs.add(table.getTableId());
121123
}
122124
}

src/test/java/io/cdap/delta/bigquery/BigQueryTargetTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public void testGetMaximumExistingSequenceNumberForRetryableFailures() throws Ex
193193
bqTarget.initialize(deltaTargetContext);
194194
} finally {
195195
//verify at least 1 retry happens
196-
PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(2));
196+
PowerMockito.verifyStatic(BigQueryUtils.class, Mockito.atLeast(1));
197197
BigQueryUtils.getMaximumExistingSequenceNumber(Mockito.anySet(), Mockito.anyString(),
198198
Mockito.nullable(String.class), Mockito.any(BigQuery.class),
199199
Mockito.nullable(EncryptionConfiguration.class), Mockito.anyInt());

src/test/java/io/cdap/delta/bigquery/BigQueryUtilsTest.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Set;
6060

6161
import static org.junit.Assert.assertEquals;
62+
import static org.mockito.ArgumentMatchers.any;
6263
import static org.mockito.Mockito.times;
6364

6465
/**
@@ -85,6 +86,7 @@ public void init() throws Exception {
8586
bigQueryMock = Mockito.mock(BigQuery.class);
8687
Table tableMock = Mockito.mock(Table.class);
8788
Dataset datasetMock = Mockito.mock(Dataset.class);
89+
Mockito.when(bigQueryMock.getDataset(any(DatasetId.class))).thenReturn(datasetMock);
8890
Mockito.when(bigQueryMock.getTable(ArgumentMatchers.any())).thenReturn(tableMock);
8991
Mockito.when(bigQueryMock.getDataset("demodataset")).thenReturn(datasetMock);
9092
PowerMockito.spy(BigQueryUtils.class);
@@ -281,7 +283,7 @@ public void testGetMaximumExistingSequenceNumberSingleInvocations() throws Excep
281283

282284
// Subtest : One Table
283285
Set<SourceTable> allTables = generateSourceTableSet(1);
284-
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1));
286+
Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1));
285287
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
286288
null, bigQueryMock, null, 1000);
287289
assertEquals(1L, tableResult);
@@ -316,7 +318,7 @@ public void testGetMaximumExistingSequenceNumberDoubleInvocations() throws Excep
316318

317319
//Subtest1 : 1001 Tables : Should call bigquery 2 times. 1000+1
318320
Set<SourceTable> allTables = generateSourceTableSet(1001);
319-
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1001));
321+
Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1001));
320322
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
321323
null, bigQueryMock, null, 1000);
322324
assertEquals(2L, tableResult);
@@ -341,7 +343,7 @@ public void testGetMaximumExistingSequenceNumberTripleInvocations() throws Excep
341343

342344
//Subtest1 : 2500 Tables : Should call bigquery 3 times. 1000+1000+500
343345
Set<SourceTable> allTables = generateSourceTableSet(2500);
344-
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(2500));
346+
Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(2500));
345347
long tableResult = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
346348
null, bigQueryMock, null, 1000);
347349
assertEquals(3L, tableResult);
@@ -354,7 +356,7 @@ public void testGetMaximumExistingSequenceNumberTripleInvocations() throws Excep
354356
@Test
355357
public void testGetMaximumExistingSequenceNumberEmptyDatasetName() throws Exception {
356358
Set<SourceTable> allTables = generateSourceTableSet(1);
357-
Mockito.when(bigQueryMock.listTables(ArgumentMatchers.anyString())).thenReturn(generateBQTablesPage(1));
359+
Mockito.when(bigQueryMock.listTables(any(DatasetId.class))).thenReturn(generateBQTablesPage(1));
358360
long tableResult0 = BigQueryUtils.getMaximumExistingSequenceNumber(allTables, PROJECT,
359361
"", bigQueryMock, null, 1000);
360362
assertEquals(1, tableResult0);

0 commit comments

Comments
 (0)