Skip to content

WIP: [BitSail][Connector] Migrate Kafka legacy source connector to V1 inte… #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bitsail-connectors/connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
</properties>

<dependencies>

<dependency>
<groupId>com.bytedance.bitsail</groupId>
<artifactId>connector-base</artifactId>
<version>${revision}</version>
</dependency>

<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,16 @@ public class KafkaConstants {
public static final String DISABLE_CURRENT_OFFSET_RATE_METRICS = "disableCurrentOffsetsRateMetrics";
public static final int MAX_PARALLELISM = 5;
public static final int REQUEST_TIMEOUT_MS_CONFIG = 1200 * 1000;

public static final String JSON_FORMAT = "json";

public static final String CONSUMER_OFFSET_LATEST_KEY = "latest";

public static final String CONSUMER_OFFSET_EARLIEST_KEY = "earliest";

public static final String CONSUMER_OFFSET_TIMESTAMP_KEY = "timestamp";

public static final Long CONSUMER_STOPPING_OFFSET = Long.MAX_VALUE;

public static final String DEFAULT_CLIENT_ID = "-coordinator-admin-client-";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.kafka.error;

import com.bytedance.bitsail.common.exception.ErrorCode;

public enum KafkaErrorCode implements ErrorCode {

TOPIC_NOT_EXISTS("Kafka-0", "Kafka fetch partitions failed."),
CONSUMER_CREATE_FAILED("Kafka-1", "Kafka AdminClient create failed."),
CONSUMER_FETCH_OFFSET_FAILED("Kafka-2", "Kafka AdminClient fetch offset failed."),
CONSUMER_SEEK_OFFSET_FAILED("Kafka-3", "Kafka AdminClient seek offset failed.");

public final String code;
public final String description;

KafkaErrorCode(String code, String description) {
this.code = code;
this.description = description;
}

@Override
public String getCode() {
return code;
}

@Override
public String getDescription() {
return description;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.kafka.format;

import com.bytedance.bitsail.base.format.DeserializationSchema;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.row.Row;
import com.bytedance.bitsail.common.typeinfo.RowTypeInfo;
import com.bytedance.bitsail.component.format.json.JsonRowDeserializationSchema;

public class KafkaDeserializationSchema implements DeserializationSchema<byte[], Row> {

private BitSailConfiguration deserializationConfiguration;

private transient JsonRowDeserializationSchema deserializationSchema;

public KafkaDeserializationSchema(BitSailConfiguration deserializationConfiguration,
RowTypeInfo rowTypeInfo) {
this.deserializationConfiguration = deserializationConfiguration;
//todo spi.
this.deserializationSchema = new JsonRowDeserializationSchema(deserializationConfiguration, rowTypeInfo);
}

@Override
public Row deserialize(byte[] message) {
return deserializationSchema.deserialize(message);
}

@Override
public boolean isEndOfStream(Row nextElement) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.kafka.option;

import com.bytedance.bitsail.common.option.ConfigOption;
import com.bytedance.bitsail.common.option.ConfigOptions;
import com.bytedance.bitsail.common.option.ReaderOptions;

import com.alibaba.fastjson.TypeReference;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX;
import static com.bytedance.bitsail.connector.kafka.constants.KafkaConstants.CONSUMER_OFFSET_LATEST_KEY;
import static com.bytedance.bitsail.connector.kafka.constants.KafkaConstants.JSON_FORMAT;

public interface KafkaSourceOptions extends ReaderOptions.BaseReaderOptions {

ConfigOption<String> BOOTSTRAP_SERVERS =
ConfigOptions.key(READER_PREFIX + "bootstrap_servers")
.noDefaultValue(String.class);

ConfigOption<String> TOPIC =
ConfigOptions.key(READER_PREFIX + "topic")
.noDefaultValue(String.class);

ConfigOption<String> CONSUMER_GROUP =
ConfigOptions.key(READER_PREFIX + "consumer_group")
.noDefaultValue(String.class);

ConfigOption<Boolean> COMMIT_IN_CHECKPOINT =
ConfigOptions.key(READER_PREFIX + "commit_in_checkpoint")
.defaultValue(false);

ConfigOption<Long> DISCOVERY_INTERNAL =
ConfigOptions.key(READER_PREFIX + "discovery_internal_ms")
.defaultValue(TimeUnit.MINUTES.toMillis(5L));

ConfigOption<String> STARTUP_MODE =
ConfigOptions.key(READER_PREFIX + "startup_mode")
.defaultValue(CONSUMER_OFFSET_LATEST_KEY);

ConfigOption<String> FORMAT_TYPE =
ConfigOptions.key(READER_PREFIX + "format_type")
.defaultValue(JSON_FORMAT);

ConfigOption<Long> STARTUP_MODE_TIMESTAMP =
ConfigOptions.key(READER_PREFIX + "startup_mode_timestamp")
.noDefaultValue(Long.class);

ConfigOption<String> CLIENT_ID_PREFIX =
ConfigOptions.key(READER_PREFIX + "client_id_prefix")
.noDefaultValue(String.class);

ConfigOption<Map<String, String>> PROPERTIES =
ConfigOptions.key(READER_PREFIX + "properties")
.onlyReference(new TypeReference<Map<String, String>>() {
});

ConfigOption<Integer> POLL_BATCH_SIZE =
ConfigOptions.key(READER_PREFIX + "poll_batch_size")
.defaultValue(2048);

ConfigOption<Long> POLL_TIMEOUT =
ConfigOptions.key(READER_PREFIX + "poll_timeout")
.defaultValue(TimeUnit.MINUTES.toMillis(1L));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.kafka.source;

import com.bytedance.bitsail.base.connector.reader.v1.Boundedness;
import com.bytedance.bitsail.base.connector.reader.v1.Source;
import com.bytedance.bitsail.base.connector.reader.v1.SourceReader;
import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator;
import com.bytedance.bitsail.base.execution.ExecutionEnviron;
import com.bytedance.bitsail.base.execution.Mode;
import com.bytedance.bitsail.base.extension.ParallelismComputable;
import com.bytedance.bitsail.base.parallelism.ParallelismAdvice;
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.common.option.CommonOptions;
import com.bytedance.bitsail.common.row.Row;
import com.bytedance.bitsail.connector.kafka.source.coordinator.KafkaSourceSplitCoordinator;
import com.bytedance.bitsail.connector.kafka.source.reader.KafkaSourceReader;
import com.bytedance.bitsail.connector.kafka.source.split.KafkaSplit;
import com.bytedance.bitsail.connector.kafka.source.split.KafkaState;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import static com.bytedance.bitsail.connector.kafka.constants.KafkaConstants.CONNECTOR_TYPE_VALUE_KAFKA;

public class KafkaSource implements Source<Row, KafkaSplit, KafkaState>, ParallelismComputable {

private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);

private BitSailConfiguration readerConfiguration;

private BitSailConfiguration commonConfiguration;

@Override
public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) throws IOException {
this.readerConfiguration = readerConfiguration;
this.commonConfiguration = execution.getCommonConfiguration();
}

@Override
public Boundedness getSourceBoundedness() {
return Mode.BATCH.equals(Mode.getJobRunMode(commonConfiguration.get(CommonOptions.JOB_TYPE))) ?
Boundedness.BOUNDEDNESS :
Boundedness.UNBOUNDEDNESS;
}

@Override
public SourceReader<Row, KafkaSplit> createReader(SourceReader.Context readerContext) {
return new KafkaSourceReader(
this.readerConfiguration,
readerContext,
getSourceBoundedness()
);
}

@Override
public SourceSplitCoordinator<KafkaSplit, KafkaState> createSplitCoordinator(SourceSplitCoordinator.Context<KafkaSplit, KafkaState> coordinatorContext) {
return new KafkaSourceSplitCoordinator(
coordinatorContext,
this.readerConfiguration,
getSourceBoundedness()
);
}

@Override
public String getReaderName() {
return CONNECTOR_TYPE_VALUE_KAFKA;
}

@Override
public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception {

return ParallelismAdvice.builder()
.adviceParallelism(1)
.enforceDownStreamChain(true)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.bytedance.bitsail.connector.kafka.source.coordinator;

import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
import com.bytedance.bitsail.connector.base.source.split.SplitAssigner;

import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class FairKafkaSplitAssigner implements SplitAssigner<TopicPartition> {

private static final Logger LOG = LoggerFactory.getLogger(FairKafkaSplitAssigner.class);

private BitSailConfiguration readerConfiguration;

private AtomicInteger atomicInteger;

public Map<TopicPartition, String> kafKaSplitIncrementMapping;

public FairKafkaSplitAssigner(BitSailConfiguration readerConfiguration,
Map<TopicPartition, String> kafkaSplitIncrementMapping) {
this.readerConfiguration = readerConfiguration;
this.kafKaSplitIncrementMapping = kafkaSplitIncrementMapping;
this.atomicInteger = new AtomicInteger(CollectionUtils
.size(kafkaSplitIncrementMapping.keySet()));
}

@Override
public String assignSplitId(TopicPartition topicPartition) {
if (!kafKaSplitIncrementMapping.containsKey(topicPartition)) {
kafKaSplitIncrementMapping.put(topicPartition, String.valueOf(atomicInteger.getAndIncrement()));
}
return kafKaSplitIncrementMapping.get(topicPartition);
}

@Override
public int assignToReader(String splitId, int totalParallelism) {
return (splitId.hashCode() & Integer.MAX_VALUE) % totalParallelism;
}
}
Loading