Skip to content

Commit

Permalink
[Feature][Mongodb-CDC] support multi-table read
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Nov 15, 2024
1 parent e452278 commit 0764304
Showing 1 changed file with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -47,6 +48,7 @@
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
Expand All @@ -56,7 +58,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -156,10 +157,6 @@ public void startUp() {
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "")
public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container)
throws InterruptedException {
cleanSourceTable();
Expand All @@ -183,10 +180,6 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container)
}

@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "")
public void testMongodbCdcMultiTableToMysqlCheckDataE2e(TestContainer container)
throws InterruptedException {
cleanSourceTable();
Expand Down Expand Up @@ -262,7 +255,28 @@ private void assertionsSourceAndSink(String mongodbCollection, String sinkMysqlQ
.map(
entryStream ->
entryStream
.map(Map.Entry::getValue)
.map(
entry -> {
Object value =
entry
.getValue();
if (value
instanceof
Number) {
return new BigDecimal(
value
.toString())
.intValue();
}
if (value
instanceof
ObjectId) {
return ((ObjectId)
value)
.toString();
}
return value;
})
.collect(
Collectors.toCollection(
ArrayList
Expand Down

0 comments on commit 0764304

Please sign in to comment.