diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java index 4a9dd34daea..89d7e8d13c0 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; -import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.tracing.MDCTracer; @@ -327,10 +326,6 @@ public void close() throws IOException { (identifier, sinkWriter) -> { try { sinkWriter.close(); - sinkWritersContext - .get(identifier) - .getEventListener() - .onEvent(new WriterCloseEvent()); } catch (Throwable e) { if (firstE[0] == null) { firstE[0] = e; diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java index 8e45bbf9de5..79a4dbe1f81 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java @@ -44,7 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -109,16 +109,23 @@ public void testEventReport() throws IOException, InterruptedException { arrayNode.elements().forEachRemaining(jsonNode -> events.add(jsonNode)); } } - Assertions.assertEquals(10, events.size()); - Set eventTypes = - events.stream().map(e -> e.get("eventType").asText()).collect(Collectors.toSet()); + Map eventMap = + events.stream() + .map(e -> e.get("eventType").asText()) + .collect(Collectors.groupingBy(e -> e, Collectors.summingInt(e -> 1))); Assertions.assertTrue( - eventTypes.containsAll( - Arrays.asList( - EventType.LIFECYCLE_ENUMERATOR_OPEN.name(), - EventType.LIFECYCLE_ENUMERATOR_CLOSE.name(), - EventType.LIFECYCLE_READER_OPEN.name(), - EventType.LIFECYCLE_READER_CLOSE.name(), - EventType.LIFECYCLE_WRITER_CLOSE.name()))); + eventMap.keySet() + .containsAll( + Arrays.asList( + EventType.LIFECYCLE_ENUMERATOR_OPEN.name(), + EventType.LIFECYCLE_ENUMERATOR_CLOSE.name(), + EventType.LIFECYCLE_READER_OPEN.name(), + EventType.LIFECYCLE_READER_CLOSE.name(), + EventType.LIFECYCLE_WRITER_CLOSE.name()))); + Assertions.assertEquals(2, eventMap.get(EventType.LIFECYCLE_READER_OPEN.name())); + Assertions.assertEquals(1, eventMap.get(EventType.LIFECYCLE_ENUMERATOR_OPEN.name())); + Assertions.assertEquals(1, eventMap.get(EventType.LIFECYCLE_ENUMERATOR_CLOSE.name())); + Assertions.assertEquals(2, eventMap.get(EventType.LIFECYCLE_READER_CLOSE.name())); + Assertions.assertEquals(2, eventMap.get(EventType.LIFECYCLE_WRITER_CLOSE.name())); } }