diff --git a/bitsail-connectors/connector-kafka/pom.xml b/bitsail-connectors/connector-kafka/pom.xml index 3abda2e18..4fcb77886 100644 --- a/bitsail-connectors/connector-kafka/pom.xml +++ b/bitsail-connectors/connector-kafka/pom.xml @@ -35,6 +35,13 @@ + + + com.bytedance.bitsail + connector-base + ${revision} + + org.apache.kafka diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/constants/KafkaConstants.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/constants/KafkaConstants.java index 21cce42fd..d55cb343b 100644 --- a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/constants/KafkaConstants.java +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/constants/KafkaConstants.java @@ -87,4 +87,16 @@ public class KafkaConstants { public static final String DISABLE_CURRENT_OFFSET_RATE_METRICS = "disableCurrentOffsetsRateMetrics"; public static final int MAX_PARALLELISM = 5; public static final int REQUEST_TIMEOUT_MS_CONFIG = 1200 * 1000; + + public static final String JSON_FORMAT = "json"; + + public static final String CONSUMER_OFFSET_LATEST_KEY = "latest"; + + public static final String CONSUMER_OFFSET_EARLIEST_KEY = "earliest"; + + public static final String CONSUMER_OFFSET_TIMESTAMP_KEY = "timestamp"; + + public static final Long CONSUMER_STOPPING_OFFSET = Long.MAX_VALUE; + + public static final String DEFAULT_CLIENT_ID = "-coordinator-admin-client-"; } diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/error/KafkaErrorCode.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/error/KafkaErrorCode.java new file mode 100644 index 000000000..95679859b --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/error/KafkaErrorCode.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.error; + +import com.bytedance.bitsail.common.exception.ErrorCode; + +public enum KafkaErrorCode implements ErrorCode { + + TOPIC_NOT_EXISTS("Kafka-0", "Kafka fetch partitions failed."), + CONSUMER_CREATE_FAILED("Kafka-1", "Kafka AdminClient create failed."), + CONSUMER_FETCH_OFFSET_FAILED("Kafka-2", "Kafka AdminClient fetch offset failed."), + CONSUMER_SEEK_OFFSET_FAILED("Kafka-3", "Kafka AdminClient seek offset failed."); + + public final String code; + public final String description; + + KafkaErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/format/KafkaDeserializationSchema.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/format/KafkaDeserializationSchema.java new file mode 100644 index 000000000..e5c23b359 --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/format/KafkaDeserializationSchema.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.format; + +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.typeinfo.RowTypeInfo; +import com.bytedance.bitsail.component.format.json.JsonRowDeserializationSchema; + +public class KafkaDeserializationSchema implements DeserializationSchema { + + private BitSailConfiguration deserializationConfiguration; + + private transient JsonRowDeserializationSchema deserializationSchema; + + public KafkaDeserializationSchema(BitSailConfiguration deserializationConfiguration, + RowTypeInfo rowTypeInfo) { + this.deserializationConfiguration = deserializationConfiguration; + //todo spi. + this.deserializationSchema = new JsonRowDeserializationSchema(deserializationConfiguration, rowTypeInfo); + } + + @Override + public Row deserialize(byte[] message) { + return deserializationSchema.deserialize(message); + } + + @Override + public boolean isEndOfStream(Row nextElement) { + return false; + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/option/KafkaSourceOptions.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/option/KafkaSourceOptions.java new file mode 100644 index 000000000..d1af9e78f --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/option/KafkaSourceOptions.java @@ -0,0 +1,82 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.option; + +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.ConfigOptions; +import com.bytedance.bitsail.common.option.ReaderOptions; + +import com.alibaba.fastjson.TypeReference; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX; +import static com.bytedance.bitsail.connector.kafka.constants.KafkaConstants.CONSUMER_OFFSET_LATEST_KEY; +import static com.bytedance.bitsail.connector.kafka.constants.KafkaConstants.JSON_FORMAT; + +public interface KafkaSourceOptions extends ReaderOptions.BaseReaderOptions { + + ConfigOption BOOTSTRAP_SERVERS = + ConfigOptions.key(READER_PREFIX + "bootstrap_servers") + .noDefaultValue(String.class); + + ConfigOption TOPIC = + ConfigOptions.key(READER_PREFIX + "topic") + .noDefaultValue(String.class); + + ConfigOption CONSUMER_GROUP = + ConfigOptions.key(READER_PREFIX + "consumer_group") + .noDefaultValue(String.class); + + ConfigOption COMMIT_IN_CHECKPOINT = + ConfigOptions.key(READER_PREFIX + "commit_in_checkpoint") + .defaultValue(false); + + ConfigOption DISCOVERY_INTERNAL = + ConfigOptions.key(READER_PREFIX + "discovery_internal_ms") + .defaultValue(TimeUnit.MINUTES.toMillis(5L)); + + ConfigOption STARTUP_MODE = + ConfigOptions.key(READER_PREFIX + "startup_mode") + .defaultValue(CONSUMER_OFFSET_LATEST_KEY); + + ConfigOption FORMAT_TYPE = + ConfigOptions.key(READER_PREFIX + "format_type") + .defaultValue(JSON_FORMAT); + + ConfigOption STARTUP_MODE_TIMESTAMP = + ConfigOptions.key(READER_PREFIX + "startup_mode_timestamp") + .noDefaultValue(Long.class); + + ConfigOption CLIENT_ID_PREFIX = + ConfigOptions.key(READER_PREFIX + "client_id_prefix") + .noDefaultValue(String.class); + + ConfigOption> PROPERTIES = + ConfigOptions.key(READER_PREFIX + "properties") + .onlyReference(new TypeReference>() { + }); + + ConfigOption POLL_BATCH_SIZE = + ConfigOptions.key(READER_PREFIX + "poll_batch_size") + .defaultValue(2048); + + ConfigOption POLL_TIMEOUT = + ConfigOptions.key(READER_PREFIX + "poll_timeout") + .defaultValue(TimeUnit.MINUTES.toMillis(1L)); +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/KafkaSource.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/KafkaSource.java new file mode 100644 index 000000000..a35906ac7 --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/KafkaSource.java @@ -0,0 +1,94 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.source; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.Source; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.execution.ExecutionEnviron; +import com.bytedance.bitsail.base.execution.Mode; +import com.bytedance.bitsail.base.extension.ParallelismComputable; +import com.bytedance.bitsail.base.parallelism.ParallelismAdvice; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.option.CommonOptions; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.kafka.source.coordinator.KafkaSourceSplitCoordinator; +import com.bytedance.bitsail.connector.kafka.source.reader.KafkaSourceReader; +import com.bytedance.bitsail.connector.kafka.source.split.KafkaSplit; +import com.bytedance.bitsail.connector.kafka.source.split.KafkaState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static com.bytedance.bitsail.connector.kafka.constants.KafkaConstants.CONNECTOR_TYPE_VALUE_KAFKA; + +public class KafkaSource implements Source, ParallelismComputable { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); + + private BitSailConfiguration readerConfiguration; + + private BitSailConfiguration commonConfiguration; + + @Override + public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException { + this.readerConfiguration = readerConfiguration; + this.commonConfiguration = execution.getCommonConfiguration(); + } + + @Override + public Boundedness getSourceBoundedness() { + return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ? + Boundedness.BOUNDEDNESS : + Boundedness.UNBOUNDEDNESS; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new KafkaSourceReader( + this.readerConfiguration, + readerContext, + getSourceBoundedness() + ); + } + + @Override + public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) { + return new KafkaSourceSplitCoordinator( + coordinatorContext, + this.readerConfiguration, + getSourceBoundedness() + ); + } + + @Override + public String getReaderName() { + return CONNECTOR_TYPE_VALUE_KAFKA; + } + + @Override + public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception { + + return ParallelismAdvice.builder() + .adviceParallelism(1) + .enforceDownStreamChain(true) + .build(); + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/coordinator/FairKafkaSplitAssigner.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/coordinator/FairKafkaSplitAssigner.java new file mode 100644 index 000000000..86f6269fb --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/coordinator/FairKafkaSplitAssigner.java @@ -0,0 +1,60 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.source.coordinator; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.base.source.split.SplitAssigner; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class FairKafkaSplitAssigner implements SplitAssigner { + + private static final Logger LOG = LoggerFactory.getLogger(FairKafkaSplitAssigner.class); + + private BitSailConfiguration readerConfiguration; + + private AtomicInteger atomicInteger; + + public Map kafKaSplitIncrementMapping; + + public FairKafkaSplitAssigner(BitSailConfiguration readerConfiguration, + Map kafkaSplitIncrementMapping) { + this.readerConfiguration = readerConfiguration; + this.kafKaSplitIncrementMapping = kafkaSplitIncrementMapping; + this.atomicInteger = new AtomicInteger(CollectionUtils + .size(kafkaSplitIncrementMapping.keySet())); + } + + @Override + public String assignSplitId(TopicPartition topicPartition) { + if (!kafKaSplitIncrementMapping.containsKey(topicPartition)) { + kafKaSplitIncrementMapping.put(topicPartition, String.valueOf(atomicInteger.getAndIncrement())); + } + return kafKaSplitIncrementMapping.get(topicPartition); + } + + @Override + public int assignToReader(String splitId, int totalParallelism) { + return (splitId.hashCode() & Integer.MAX_VALUE) % totalParallelism; + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/coordinator/KafkaSourceSplitCoordinator.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/coordinator/KafkaSourceSplitCoordinator.java new file mode 100644 index 000000000..794436b8f --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/coordinator/KafkaSourceSplitCoordinator.java @@ -0,0 +1,309 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.source.coordinator; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.exception.CommonErrorCode; +import com.bytedance.bitsail.connector.base.source.split.SplitAssigner; +import com.bytedance.bitsail.connector.kafka.constants.KafkaConstants; +import com.bytedance.bitsail.connector.kafka.error.KafkaErrorCode; +import com.bytedance.bitsail.connector.kafka.option.KafkaSourceOptions; +import com.bytedance.bitsail.connector.kafka.source.split.KafkaSplit; +import com.bytedance.bitsail.connector.kafka.source.split.KafkaState; +import com.bytedance.bitsail.connector.kafka.util.KafkaUtils; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +import static com.bytedance.bitsail.connector.kafka.constants.KafkaConstants.CONSUMER_OFFSET_TIMESTAMP_KEY; + +public class KafkaSourceSplitCoordinator implements SourceSplitCoordinator { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceSplitCoordinator.class); + + private final SourceSplitCoordinator.Context context; + private final BitSailConfiguration jobConfiguration; + private final Boundedness boundedness; + + private final Set discoveredPartitions; + private final Map assignedPartitions; + private final Map> pendingKafkaSplitAssignment; + private final long discoveryInternal; + private final Properties properties = new Properties(); + + private String bootstrapServers; + private String topic; + private String consumerGroup; + private String startupMode; + private long consumerOffsetTimestamp; + private Map consumerStopOffset; + + private transient SplitAssigner splitAssigner; + private transient Consumer consumer; + + public KafkaSourceSplitCoordinator( + SourceSplitCoordinator.Context context, + BitSailConfiguration jobConfiguration, + Boundedness boundedness) { + this.context = context; + this.jobConfiguration = jobConfiguration; + this.boundedness = boundedness; + this.discoveryInternal = jobConfiguration.get(KafkaSourceOptions.DISCOVERY_INTERNAL); + this.properties.putAll(jobConfiguration.get(KafkaSourceOptions.PROPERTIES)); + this.pendingKafkaSplitAssignment = Maps.newConcurrentMap(); + this.consumerOffsetTimestamp = jobConfiguration.get(KafkaSourceOptions.STARTUP_MODE_TIMESTAMP); + + this.discoveredPartitions = new HashSet<>(); + if (context.isRestored()) { + KafkaState restoreState = context.getRestoreState(); + assignedPartitions = restoreState.getAssignedWithSplitsIds(); + discoveredPartitions.addAll(assignedPartitions.keySet()); + } else { + assignedPartitions = Maps.newHashMap(); + } + + prepareConsumerProperties(); + } + + @Override + public void start() { + prepareKafkaConsumer(); + splitAssigner = new FairKafkaSplitAssigner(jobConfiguration, assignedPartitions); + if (discoveryInternal > 0) { + context.runAsync( + this::fetchTopicPartitions, + this::handleTopicPartitionChanged, + 0, + discoveryInternal + ); + } else { + context.runAsyncOnce( + this::fetchTopicPartitions, + this::handleTopicPartitionChanged + ); + } + } + + @Override + public void addReader(int subtaskId) { + LOG.info( + "Adding reader {} to Kafka Split Coordinator for consumer group {}.", + subtaskId, + consumerGroup); + notifyReaderAssignmentResult(); + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + LOG.info("Source reader {} return splits {}.", subtaskId, splits); + addSplitChangeToPendingAssignment(new HashSet<>(splits)); + notifyReaderAssignmentResult(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + // empty + } + + @Override + public KafkaState snapshotState() throws Exception { + return new KafkaState(assignedPartitions); + } + + @Override + public void close() { + if (consumer != null) { + consumer.close(); + } + } + + // TODO: add more check + private void prepareConsumerProperties() { + this.bootstrapServers = jobConfiguration.get(KafkaSourceOptions.BOOTSTRAP_SERVERS); + this.topic = jobConfiguration.get(KafkaSourceOptions.TOPIC); + this.consumerGroup = jobConfiguration.get(KafkaSourceOptions.CONSUMER_GROUP); + this.startupMode = jobConfiguration.get(KafkaSourceOptions.STARTUP_MODE); + if (StringUtils.equalsIgnoreCase(startupMode, CONSUMER_OFFSET_TIMESTAMP_KEY)) { + consumerOffsetTimestamp = jobConfiguration.get(KafkaSourceOptions.STARTUP_MODE_TIMESTAMP); + } + } + + private void prepareKafkaConsumer() { + try { + consumer = KafkaUtils.prepareKafkaConsumer(jobConfiguration, properties); + } catch (Exception e) { + throw BitSailException.asBitSailException(KafkaErrorCode.CONSUMER_CREATE_FAILED, e); + } + } + + private Set fetchTopicPartitions() { + String[] splits = this.topic.split(","); + List allPartitionForTopic = getAllPartitionForTopic(Arrays.asList(splits)); + Collection fetchedTopicPartitions = Sets.newHashSet(allPartitionForTopic); + + discoveredPartitions.addAll(fetchedTopicPartitions); + consumer.assign(fetchedTopicPartitions); + + Set pendingAssignedPartitions = Sets.newHashSet(); + for (TopicPartition topicPartition : fetchedTopicPartitions) { + if (assignedPartitions.containsKey(topicPartition)) { + continue; + } + + pendingAssignedPartitions.add( + KafkaSplit.builder() + .topicPartition(topicPartition) + .startOffset(getStartOffset(topicPartition)) + .endOffset(getEndOffset(topicPartition)) + .splitId(splitAssigner.assignSplitId(topicPartition)) + .build() + ); + } + return pendingAssignedPartitions; + } + + private long getEndOffset(TopicPartition topicPartition) { + return consumerStopOffset.getOrDefault(topicPartition, + KafkaConstants.CONSUMER_STOPPING_OFFSET); + } + + private long getStartOffset(TopicPartition topicPartition) { + switch (startupMode) { + case KafkaConstants.CONSUMER_OFFSET_EARLIEST_KEY: + consumer.seekToBeginning(Collections.singletonList(topicPartition)); + return consumer.position(topicPartition); + case KafkaConstants.CONSUMER_OFFSET_LATEST_KEY: + consumer.seekToEnd(Collections.singletonList(topicPartition)); + return consumer.position(topicPartition); + case CONSUMER_OFFSET_TIMESTAMP_KEY: + HashMap timestampsToSearch = new HashMap<>(); + timestampsToSearch.put(topicPartition, consumerOffsetTimestamp); + consumer.offsetsForTimes(timestampsToSearch); + return consumer.position(topicPartition); + default: + throw BitSailException.asBitSailException( + KafkaErrorCode.CONSUMER_FETCH_OFFSET_FAILED, + String.format("Consumer startup mode = %s not support right now.", startupMode)); + } + } + + private List getAllPartitionForTopic(List topics) { + final List partitions = new LinkedList<>(); + + for (String topic : topics) { + List kafkaPartitions = this.consumer.partitionsFor(topic); + + if (Objects.isNull(kafkaPartitions)) { + throw new BitSailException(KafkaErrorCode.TOPIC_NOT_EXISTS, + String.format( + "Could not fetch partitions for %s. Make sure that the topic exists.", + topic)); + } + + for (PartitionInfo partitionInfo : kafkaPartitions) { + partitions.add( + new TopicPartition(partitionInfo.topic(), partitionInfo.partition()) + ); + } + } + return partitions; + } + + private List getAllTopics() { + Set topics = this.consumer.listTopics().keySet(); + return Lists.newArrayList(topics); + } + + private void handleTopicPartitionChanged(Set pendingAssignedSplits, + Throwable throwable) { + if (throwable != null) { + throw BitSailException.asBitSailException( + CommonErrorCode.INTERNAL_ERROR, + String.format("Failed to fetch kafka offset for the topic: %s", topic), throwable); + } + + if (CollectionUtils.isEmpty(pendingAssignedSplits)) { + return; + } + addSplitChangeToPendingAssignment(pendingAssignedSplits); + notifyReaderAssignmentResult(); + } + + private void notifyReaderAssignmentResult() { + Map> tmpKafkaSplitAssignments = new HashMap<>(); + + for (Integer pendingAssignmentReader : pendingKafkaSplitAssignment.keySet()) { + if (CollectionUtils.isNotEmpty(pendingKafkaSplitAssignment.get(pendingAssignmentReader)) + && context.registeredReaders().contains(pendingAssignmentReader)) { + tmpKafkaSplitAssignments.put(pendingAssignmentReader, Lists.newArrayList(pendingKafkaSplitAssignment.get(pendingAssignmentReader))); + } + } + + for (Integer pendingAssignmentReader : tmpKafkaSplitAssignments.keySet()) { + LOG.info("Assigning splits to reader {}, splits = {}.", pendingAssignmentReader, + tmpKafkaSplitAssignments.get(pendingAssignmentReader)); + + context.assignSplit(pendingAssignmentReader, + tmpKafkaSplitAssignments.get(pendingAssignmentReader)); + Set removes = pendingKafkaSplitAssignment.remove(pendingAssignmentReader); + removes.forEach(removeSplit -> { + assignedPartitions.put(removeSplit.getTopicPartition(), removeSplit.getSplitId()); + }); + LOG.info("Assigned splits to reader {}", pendingAssignmentReader); + + if (Boundedness.BOUNDEDNESS == boundedness) { + LOG.info("Signal reader {} no more splits assigned in future.", pendingAssignmentReader); + context.signalNoMoreSplits(pendingAssignmentReader); + } + } + } + + private synchronized void addSplitChangeToPendingAssignment(Set newKafkaSplits) { + int numReader = context.totalParallelism(); + for (KafkaSplit split : newKafkaSplits) { + int readerIndex = splitAssigner.assignToReader(split.getSplitId(), numReader); + pendingKafkaSplitAssignment.computeIfAbsent(readerIndex, r -> new HashSet<>()) + .add(split); + } + LOG.debug("Kafka splits {} finished assignment.", newKafkaSplits); + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/reader/KafkaSourceReader.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/reader/KafkaSourceReader.java new file mode 100644 index 000000000..1cf59d77c --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/reader/KafkaSourceReader.java @@ -0,0 +1,183 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.kafka.format.KafkaDeserializationSchema; +import com.bytedance.bitsail.connector.kafka.option.KafkaSourceOptions; +import com.bytedance.bitsail.connector.kafka.source.split.KafkaSplit; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +public class KafkaSourceReader implements SourceReader { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReader.class); + + private final String bootstrapServer; + private final String topic; + private final String consumerGroup; + private final int pollBatchSize; + private final long pollTimeout; + private final Boundedness boundedness; + private final boolean commitInCheckpoint; + + private final BitSailConfiguration readerConfiguration; + private final transient Context context; + private final transient Set assignedKafkaSplits; + private final transient Set finishedKafkaSplits; + private final transient DeserializationSchema deserializationSchema; + private transient boolean noMoreSplits; + private transient Consumer consumer; + + public KafkaSourceReader(BitSailConfiguration readerConfiguration, + Context context, + Boundedness boundedness) { + this.readerConfiguration = readerConfiguration; + this.boundedness = boundedness; + this.context = context; + this.assignedKafkaSplits = Sets.newHashSet(); + this.finishedKafkaSplits = Sets.newHashSet(); + this.deserializationSchema = new KafkaDeserializationSchema( + readerConfiguration, + context.getRowTypeInfo() + ); + this.noMoreSplits = false; + + this.bootstrapServer = readerConfiguration.get(KafkaSourceOptions.BOOTSTRAP_SERVERS); + this.topic = readerConfiguration.get(KafkaSourceOptions.TOPIC); + this.consumerGroup = readerConfiguration.get(KafkaSourceOptions.CONSUMER_GROUP); + this.pollBatchSize = readerConfiguration.get(KafkaSourceOptions.POLL_BATCH_SIZE); + this.pollTimeout = readerConfiguration.get(KafkaSourceOptions.POLL_TIMEOUT); + this.commitInCheckpoint = readerConfiguration.get(KafkaSourceOptions.COMMIT_IN_CHECKPOINT); + } + + @Override + public void start() { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServer); + + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.consumerGroup); + + consumer = new KafkaConsumer<>(properties); + } + + @Override + public void pollNext(SourcePipeline pipeline) throws Exception { + for (KafkaSplit kafkaSplit : assignedKafkaSplits) { + TopicPartition topicPartition = kafkaSplit.getTopicPartition(); + consumer.assign(Collections.singletonList(topicPartition)); + ConsumerRecords pullResult = consumer.poll(pollTimeout); + List> records = pullResult.records(topicPartition); + + if (Objects.isNull(records) || CollectionUtils.isEmpty(records)) { + continue; + } + + for (ConsumerRecord record : records) { + Row deserialize = deserializationSchema.deserialize(record.value()); + pipeline.output(deserialize); + if (kafkaSplit.getStartOffset() >= kafkaSplit.getEndOffset()) { + LOG.info("Subtask {} kafka split {} in end of stream.", + context.getIndexOfSubtask(), + kafkaSplit); + finishedKafkaSplits.add(kafkaSplit); + break; + } + } + kafkaSplit.setStartOffset(records.size() - 1); + // if (!commitInCheckpoint) { + // consumer.seek(topicPartition, ); + // } + } + assignedKafkaSplits.removeAll(finishedKafkaSplits); + } + + @Override + public void addSplits(List splits) { + LOG.info("Subtask {} received {}(s) new splits, splits = {}.", + context.getIndexOfSubtask(), + CollectionUtils.size(splits), + splits); + + assignedKafkaSplits.addAll(splits); + } + + @Override + public boolean hasMoreElements() { + if (Boundedness.UNBOUNDEDNESS == boundedness) { + return true; + } + if (noMoreSplits) { + return CollectionUtils.size(assignedKafkaSplits) != 0; + } + return true; + } + + @Override + public List snapshotState(long checkpointId) { + LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId); + if (commitInCheckpoint) { + for (KafkaSplit kafkaSplit : assignedKafkaSplits) { + consumer.seek(kafkaSplit.getTopicPartition(), kafkaSplit.getStartOffset()); + LOG.debug("Subtask {} committed topic partition = {} in checkpoint id = {}.", + context.getIndexOfSubtask(), + kafkaSplit.getTopicPartition(), + checkpointId); + } + } + return Lists.newArrayList(assignedKafkaSplits); + } + + @Override + public void close() throws Exception { + if (Objects.nonNull(consumer)) { + consumer.close(); + LOG.info("Subtask {} shutdown consumer.", context.getIndexOfSubtask()); + } + LOG.info("Subtask {} closed.", context.getIndexOfSubtask()); + } + + @Override + public void notifyNoMoreSplits() { + LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask()); + noMoreSplits = true; + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/split/KafkaSplit.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/split/KafkaSplit.java new file mode 100644 index 000000000..204c1e9cb --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/split/KafkaSplit.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.apache.kafka.common.TopicPartition; + +@Builder +@Getter +public class KafkaSplit implements SourceSplit { + + private TopicPartition topicPartition; + + @Setter + private long startOffset; + + private long endOffset; + + private String splitId; + + @Override + public String uniqSplitId() { + return splitId; + } + + @Override + public String toString() { + return "KafkaSplit{" + + "topicPartition=" + topicPartition + + ", startOffset=" + startOffset + + ", endOffset=" + endOffset + + ", splitId='" + splitId + '\'' + + '}'; + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/split/KafkaState.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/split/KafkaState.java new file mode 100644 index 000000000..898e900b4 --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/source/split/KafkaState.java @@ -0,0 +1,35 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.source.split; + +import org.apache.kafka.common.TopicPartition; + +import java.io.Serializable; +import java.util.Map; + +public class KafkaState implements Serializable { + + private final Map assignedWithSplitsIds; + + public KafkaState(Map assignedWithSplitsIds) { + this.assignedWithSplitsIds = assignedWithSplitsIds; + } + + public Map getAssignedWithSplitsIds() { + return this.assignedWithSplitsIds; + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/util/KafkaUtils.java b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/util/KafkaUtils.java new file mode 100644 index 000000000..085eeb3df --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/java/com/bytedance/bitsail/connector/kafka/util/KafkaUtils.java @@ -0,0 +1,68 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kafka.util; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.kafka.option.KafkaSourceOptions; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +public class KafkaUtils { + + public static AdminClient prepareKafkaAdminClient(BitSailConfiguration kafkaConfiguration, Properties properties) { + + String bootstrapServers = kafkaConfiguration.get(KafkaSourceOptions.BOOTSTRAP_SERVERS); + String clientIdPrefix = kafkaConfiguration.get(KafkaSourceOptions.CLIENT_ID_PREFIX); + + Properties props = new Properties(); + props.putAll(properties); + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientIdPrefix); + + return AdminClient.create(props); + } + + public static Consumer prepareKafkaConsumer(BitSailConfiguration kafkaConfiguration, + Properties properties) { + String bootstrapServers = kafkaConfiguration.get(KafkaSourceOptions.BOOTSTRAP_SERVERS); + String topic = kafkaConfiguration.get(KafkaSourceOptions.TOPIC); + String consumerGroup = kafkaConfiguration.get(KafkaSourceOptions.CONSUMER_GROUP); + + Properties props = new Properties(); + props.putAll(properties); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + Consumer kafkaConsumer = new KafkaConsumer<>(props); + + List topics = Collections.singletonList(topic); + kafkaConsumer.subscribe(topics); + + return kafkaConsumer; + } +} diff --git a/bitsail-connectors/connector-kafka/src/main/resources/bitsail-connector-unified-kafka.json b/bitsail-connectors/connector-kafka/src/main/resources/bitsail-connector-unified-kafka.json new file mode 100644 index 000000000..ece6f3b4c --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/main/resources/bitsail-connector-unified-kafka.json @@ -0,0 +1,10 @@ +{ + "name": "bitsail-connector-unified-kafka", + "classes": [ + "com.bytedance.bitsail.connector.kafka.source.KafkaSource", + "com.bytedance.bitsail.connector.kafka.sink.KafkaSink" + ], + "libs": [ + "connector-kafka-${version}.jar" + ] +} diff --git a/bitsail-connectors/connector-kafka/src/test/resources/kafka_to_print.json b/bitsail-connectors/connector-kafka/src/test/resources/kafka_to_print.json new file mode 100644 index 000000000..53ba8398c --- /dev/null +++ b/bitsail-connectors/connector-kafka/src/test/resources/kafka_to_print.json @@ -0,0 +1,40 @@ +{ + "job": { + "common": { + "job_type": "STREAMING", + "job_plugin_lib_dir": "plugin", + "job_plugin_conf_dir": "plugin_conf", + "enable_dynamic_loader": true, + "instance_id": "1", + "internal_instance_id": "1", + "extra_properties": { + "update-mode": "append" + } + }, + "reader": { + "bootstrap.servers": "PLAINTEXT://localhost:9092", + "topic": "testTopic", + "startup-mode": "earliest-offset", + "consumer_group": "test_consumer", + "columns": [ + { + "name": "ID", + "type": "long" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "DATE", + "type": "long" + } + ], + "format_type": "json", + "class": "com.bytedance.bitsail.connector.kafka.source.KafkaSource" + }, + "writer": { + "class": "com.bytedance.bitsail.connector.legacy.print.sink.PrintSink" + } + } +} diff --git a/bitsail-test/bitsail-test-integration/bitsail-test-integration-kafka/src/test/java/com/bytedance/bitsail/test/integration/kafka/KafkaSourceITCase.java b/bitsail-test/bitsail-test-integration/bitsail-test-integration-kafka/src/test/java/com/bytedance/bitsail/test/integration/kafka/KafkaSourceITCase.java new file mode 100644 index 000000000..a5a78d6e0 --- /dev/null +++ b/bitsail-test/bitsail-test-integration/bitsail-test-integration-kafka/src/test/java/com/bytedance/bitsail/test/integration/kafka/KafkaSourceITCase.java @@ -0,0 +1,102 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.test.integration.kafka; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.test.integration.AbstractIntegrationTest; +import com.bytedance.bitsail.test.integration.kafka.container.KafkaCluster; +import com.bytedance.bitsail.test.integration.utils.JobConfUtils; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Maps; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Ignore +public class KafkaSourceITCase extends AbstractIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceITCase.class); + + private static final int TOTAL_SEND_COUNT = 5000; + private final String topicName = "testTopic"; + private final KafkaCluster kafkaCluster = new KafkaCluster(); + + private static String constructARecord(int index) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("ID", index); + jsonObject.put("NAME", "text_" + index); + jsonObject.put("DATE", System.currentTimeMillis()); + return jsonObject.toJSONString(); + } + + @Before + public void before() { + kafkaCluster.startService(); + kafkaCluster.createTopic(topicName); + startSendDataToKafka(); + } + + private void startSendDataToKafka() { + KafkaProducer producer = kafkaCluster.getProducer(topicName); + ScheduledThreadPoolExecutor produceService = new ScheduledThreadPoolExecutor(1); + AtomicInteger sendCount = new AtomicInteger(0); + produceService.scheduleAtFixedRate(() -> { + try { + for (int i = 0; i < TOTAL_SEND_COUNT; ++i) { + String record = constructARecord(sendCount.getAndIncrement()); + producer.send(new ProducerRecord(topicName, record)); + } + } catch (Exception e) { + LOG.error("failed to send a record"); + } finally { + LOG.info(">>> kafka produce count: {}", sendCount.get()); + } + }, 0, 1, TimeUnit.SECONDS); + } + + @Test + public void testKafkaSource() throws Exception { + BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print.json"); + updateConfiguration(configuration); + submitJob(configuration); + } + + protected void updateConfiguration(BitSailConfiguration jobConfiguration) { + // jobConfiguration.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_SEND_COUNT); + + Map properties = Maps.newHashMap(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCluster.getBootstrapServer()); + properties.put("topic", topicName); + jobConfiguration.set("job.reader.connector.connector", properties); + } + + @After + public void after() { + kafkaCluster.stopService(); + } +}