diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml
index fe097505b..ee8857ec1 100644
--- a/flink-connector-kafka/pom.xml
+++ b/flink-connector-kafka/pom.xml
@@ -72,6 +72,14 @@ under the License.
${kafka.version}
+
+
+
+ org.apache.commons
+ commons-collections4
+ 4.4
+
+
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index f291b05bc..9303998e9 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -59,7 +59,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
-import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.collections4.map.LinkedMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c5bc3b003..1d89f2bea 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -30,7 +30,7 @@
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.DockerImageVersions;
-import org.apache.commons.collections.list.UnmodifiableList;
+import org.apache.commons.collections4.list.UnmodifiableList;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
@@ -250,7 +250,9 @@ public KafkaSourceBuilder getSourceBuilder(
@SuppressWarnings("unchecked")
public Collection> getAllRecordsFromTopic(
Properties properties, String topic) {
- return UnmodifiableList.decorate(KafkaUtil.drainAllRecordsFromTopic(topic, properties));
+ List> records =
+ KafkaUtil.drainAllRecordsFromTopic(topic, properties);
+ return UnmodifiableList.unmodifiableList((List) records);
}
@Override
diff --git a/flink-sql-connector-kafka/pom.xml b/flink-sql-connector-kafka/pom.xml
index ca877f3eb..634e00dcf 100644
--- a/flink-sql-connector-kafka/pom.xml
+++ b/flink-sql-connector-kafka/pom.xml
@@ -70,6 +70,7 @@ under the License.
org.apache.flink:flink-connector-base
org.apache.flink:flink-connector-kafka
org.apache.kafka:*
+ org.apache.commons:commons-collections4
@@ -91,6 +92,10 @@ under the License.
org.apache.kafka
org.apache.flink.kafka.shaded.org.apache.kafka
+
+ org.apache.commons
+ org.apache.flink.kafka.shaded.org.apache.commons
+
diff --git a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
index 1ca013b7b..9b1d1db4c 100644
--- a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
@@ -7,3 +7,4 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.apache.kafka:kafka-clients:3.4.0
+- org.apache.commons:commons-collections4:4.4