Skip to content

Commit

Permalink
[FLINK-32522][connectors/kafka] Kafka connector should depend on comm…
Browse files Browse the repository at this point in the history
…ons-collections instead of inheriting from flink
  • Loading branch information
chucheng92 committed Oct 19, 2023
1 parent b0f15f2 commit e4770fb
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 3 deletions.
8 changes: 8 additions & 0 deletions flink-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ under the License.
<version>${kafka.version}</version>
</dependency>

<!-- Others -->

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>

<!-- Tests -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;

import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.DockerImageVersions;

import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.collections4.list.UnmodifiableList;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
Expand Down Expand Up @@ -250,7 +250,9 @@ public <T> KafkaSourceBuilder<T> getSourceBuilder(
@SuppressWarnings("unchecked")
public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
Properties properties, String topic) {
return UnmodifiableList.decorate(KafkaUtil.drainAllRecordsFromTopic(topic, properties));
List<ConsumerRecord<byte[], byte[]>> records =
KafkaUtil.drainAllRecordsFromTopic(topic, properties);
return UnmodifiableList.unmodifiableList((List) records);
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions flink-sql-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ under the License.
<include>org.apache.flink:flink-connector-base</include>
<include>org.apache.flink:flink-connector-kafka</include>
<include>org.apache.kafka:*</include>
<include>org.apache.commons:commons-collections4</include>
</includes>
</artifactSet>
<filters>
Expand All @@ -91,6 +92,10 @@ under the License.
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.commons</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)

- org.apache.kafka:kafka-clients:3.4.0
- org.apache.commons:commons-collections4:4.4

0 comments on commit e4770fb

Please sign in to comment.