Skip to content

Commit

Permalink
[improve][fn] Implement pip 401: Support set batching configurations …
Browse files Browse the repository at this point in the history
…for Pulsar Functions&Sources (#23860)
  • Loading branch information
jiangpengcheng authored Mar 12, 2025
1 parent 114aaf0 commit fad925f
Show file tree
Hide file tree
Showing 17 changed files with 674 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.functions;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class BatchingConfig {
@Builder.Default
private boolean enabled = true;
@Builder.Default
private Integer batchingMaxPublishDelayMs = 10;
private Integer roundRobinRouterBatchingPartitionSwitchFrequency;
private Integer batchingMaxMessages;
private Integer batchingMaxBytes;
private String batchBuilder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ public class ProducerConfig {
private CryptoConfig cryptoConfig;
private String batchBuilder;
private CompressionType compressionType;
private BatchingConfig batchingConfig;
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.pulsar.functions.source.SingleConsumerPulsarSource;
import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig;
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.BatchingUtils;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
Expand Down Expand Up @@ -1050,6 +1051,7 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
.batchBuilder(conf.getBatchBuilder())
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()))
.batchingConfig(BatchingUtils.convertFromSpec(conf.getBatchingSpec()))
.compressionType(FunctionCommon.convertFromFunctionDetailsCompressionType(
conf.getCompressionType()));
pulsarSinkConfig.setProducerConfig(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,35 @@ public <T> ProducerBuilder<T> createProducerBuilder(String topic, Schema<T> sche
builder.batcherBuilder(BatcherBuilder.DEFAULT);
}
}
if (producerConfig.getBatchingConfig() != null) {
builder.enableBatching(producerConfig.getBatchingConfig().isEnabled());
if (producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs() != null
&& producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs() > 0) {
builder.batchingMaxPublishDelay(producerConfig.getBatchingConfig().getBatchingMaxPublishDelayMs(),
TimeUnit.MILLISECONDS);
}
if (producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency() != null
&& producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency()
> 0) {
builder.roundRobinRouterBatchingPartitionSwitchFrequency(
producerConfig.getBatchingConfig().getRoundRobinRouterBatchingPartitionSwitchFrequency());
}
if (producerConfig.getBatchingConfig().getBatchingMaxMessages() != null
&& producerConfig.getBatchingConfig().getBatchingMaxMessages() > 0) {
builder.batchingMaxMessages(producerConfig.getBatchingConfig().getBatchingMaxMessages());
}
if (producerConfig.getBatchingConfig().getBatchingMaxBytes() != null
&& producerConfig.getBatchingConfig().getBatchingMaxBytes() > 0) {
builder.batchingMaxBytes(producerConfig.getBatchingConfig().getBatchingMaxBytes());
}
if (producerConfig.getBatchingConfig().getBatchBuilder() != null) {
if (producerConfig.getBatchingConfig().getBatchBuilder().equals("KEY_BASED")) {
builder.batcherBuilder(BatcherBuilder.KEY_BASED);
} else {
builder.batcherBuilder(BatcherBuilder.DEFAULT);
}
}
}
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.testng.Assert.assertEquals;
Expand All @@ -41,6 +42,7 @@
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.BatchingConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.mockito.internal.util.MockUtil;
Expand Down Expand Up @@ -139,6 +141,62 @@ public void testCreateProducerBuilderWithAdvancedProducerConfig() {
cryptoConfig.setCryptoKeyReaderConfig(Map.of("key", "value"));
cryptoConfig.setCryptoKeyReaderClassName(TestCryptoKeyReader.class.getName());
producerConfig.setCryptoConfig(cryptoConfig);
BatchingConfig batchingConfig = new BatchingConfig();
batchingConfig.setEnabled(true);
batchingConfig.setBatchingMaxPublishDelayMs(20);
batchingConfig.setBatchingMaxMessages(100);
batchingConfig.setBatchingMaxBytes(-1);
producerConfig.setBatchingConfig(batchingConfig);
ProducerBuilderFactory builderFactory = new ProducerBuilderFactory(pulsarClient, producerConfig, null, null);
builderFactory.createProducerBuilder("topic", Schema.STRING, "producerName");

verify(pulsarClient).newProducer(Schema.STRING);
verify(producerBuilder).blockIfQueueFull(true);
// enableBatching will be called twice here:
// the first time is called by default to keep the backward compability
// the second call is called when the producerConfig and producerConfig.batchingConfig are not null
verify(producerBuilder, times(2)).enableBatching(true);
verify(producerBuilder).batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS);
verify(producerBuilder).hashingScheme(HashingScheme.Murmur3_32Hash);
verify(producerBuilder).messageRoutingMode(MessageRoutingMode.CustomPartition);
verify(producerBuilder).messageRouter(FunctionResultRouter.of());
verify(producerBuilder).sendTimeout(0, TimeUnit.SECONDS);
verify(producerBuilder).topic("topic");
verify(producerBuilder).producerName("producerName");

verify(producerBuilder).compressionType(CompressionType.SNAPPY);
verify(producerBuilder).batcherBuilder(BatcherBuilder.KEY_BASED);
verify(producerBuilder).maxPendingMessages(5000);
verify(producerBuilder).maxPendingMessagesAcrossPartitions(50000);
TestCryptoKeyReader lastInstance = TestCryptoKeyReader.LAST_INSTANCE;
assertNotNull(lastInstance);
assertEquals(lastInstance.configs, cryptoConfig.getCryptoKeyReaderConfig());
verify(producerBuilder).cryptoKeyReader(lastInstance);
verify(producerBuilder).cryptoFailureAction(ProducerCryptoFailureAction.FAIL);
verify(producerBuilder).addEncryptionKey("key1");
verify(producerBuilder).addEncryptionKey("key2");
verify(producerBuilder).batchingMaxPublishDelay(20, TimeUnit.MILLISECONDS);
verify(producerBuilder).batchingMaxMessages(100);
verifyNoMoreInteractions(producerBuilder);
}

@Test
public void testCreateProducerBuilderWithBatchingDisabled() {
ProducerConfig producerConfig = new ProducerConfig();
producerConfig.setBatchBuilder("KEY_BASED");
producerConfig.setCompressionType(CompressionType.SNAPPY);
producerConfig.setMaxPendingMessages(5000);
producerConfig.setMaxPendingMessagesAcrossPartitions(50000);
CryptoConfig cryptoConfig = new CryptoConfig();
cryptoConfig.setProducerCryptoFailureAction(ProducerCryptoFailureAction.FAIL);
cryptoConfig.setEncryptionKeys(new String[]{"key1", "key2"});
cryptoConfig.setCryptoKeyReaderConfig(Map.of("key", "value"));
cryptoConfig.setCryptoKeyReaderClassName(TestCryptoKeyReader.class.getName());
producerConfig.setCryptoConfig(cryptoConfig);
BatchingConfig batchingConfig = new BatchingConfig();
batchingConfig.setEnabled(false);
batchingConfig.setBatchingMaxPublishDelayMs(0);
producerConfig.setBatchingConfig(batchingConfig);
ProducerBuilderFactory builderFactory = new ProducerBuilderFactory(pulsarClient, producerConfig, null, null);
builderFactory.createProducerBuilder("topic", Schema.STRING, "producerName");
verifyCommon();
Expand All @@ -153,12 +211,14 @@ public void testCreateProducerBuilderWithAdvancedProducerConfig() {
verify(producerBuilder).cryptoFailureAction(ProducerCryptoFailureAction.FAIL);
verify(producerBuilder).addEncryptionKey("key1");
verify(producerBuilder).addEncryptionKey("key2");
verify(producerBuilder).enableBatching(false);
verifyNoMoreInteractions(producerBuilder);
}

public static class TestCryptoKeyReader implements CryptoKeyReader {
static TestCryptoKeyReader LAST_INSTANCE;
Map<String, Object> configs;

public TestCryptoKeyReader(Map<String, Object> configs) {
this.configs = configs;
assert LAST_INSTANCE == null;
Expand Down
10 changes: 10 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ message ProducerSpec {
CryptoSpec cryptoSpec = 4;
string batchBuilder = 5;
CompressionType compressionType = 6;
BatchingSpec batchingSpec = 7;
}

message CryptoSpec {
Expand All @@ -147,6 +148,15 @@ message CryptoSpec {
FailureAction consumerCryptoFailureAction = 5;
}

message BatchingSpec {
bool enabled = 1;
int32 batchingMaxPublishDelayMs = 2;
int32 roundRobinRouterBatchingPartitionSwitchFrequency = 3;
int32 batchingMaxMessages = 4;
int32 batchingMaxBytes = 5;
string batchBuilder = 6;
}

message SourceSpec {
string className = 1;
// map in json format
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;

import org.apache.pulsar.common.functions.BatchingConfig;
import org.apache.pulsar.functions.proto.Function;

public final class BatchingUtils {
public static Function.BatchingSpec convert(BatchingConfig config) {
if (config == null) {
return null;
}

Function.BatchingSpec.Builder builder = Function.BatchingSpec.newBuilder()
.setEnabled(config.isEnabled());

if (config.getBatchingMaxPublishDelayMs() != null && config.getBatchingMaxPublishDelayMs() > 0) {
builder.setBatchingMaxPublishDelayMs(config.getBatchingMaxPublishDelayMs());
}
if (config.getRoundRobinRouterBatchingPartitionSwitchFrequency() != null
&& config.getRoundRobinRouterBatchingPartitionSwitchFrequency() > 0) {
builder.setRoundRobinRouterBatchingPartitionSwitchFrequency(
config.getRoundRobinRouterBatchingPartitionSwitchFrequency());
}
if (config.getBatchingMaxMessages() != null && config.getBatchingMaxMessages() > 0) {
builder.setBatchingMaxMessages(config.getBatchingMaxMessages());
}
if (config.getBatchingMaxBytes() != null && config.getBatchingMaxBytes() > 0) {
builder.setBatchingMaxBytes(config.getBatchingMaxBytes());
}
if (config.getBatchBuilder() != null && !config.getBatchBuilder().isEmpty()) {
builder.setBatchBuilder(config.getBatchBuilder());
}

return builder.build();
}

public static BatchingConfig convertFromSpec(Function.BatchingSpec spec) {
// to keep the backward compatibility, when batchingSpec is null or empty
// the batching is enabled by default, and the default max publish delay is 10ms
if (spec == null || spec.toString().equals("")) {
return BatchingConfig.builder()
.enabled(true)
.batchingMaxPublishDelayMs(10)
.build();
}

BatchingConfig.BatchingConfigBuilder builder = BatchingConfig.builder()
.enabled(spec.getEnabled());

if (spec.getBatchingMaxPublishDelayMs() > 0) {
builder.batchingMaxPublishDelayMs(spec.getBatchingMaxPublishDelayMs());
}
if (spec.getRoundRobinRouterBatchingPartitionSwitchFrequency() > 0) {
builder.roundRobinRouterBatchingPartitionSwitchFrequency(
spec.getRoundRobinRouterBatchingPartitionSwitchFrequency());
}
if (spec.getBatchingMaxMessages() > 0) {
builder.batchingMaxMessages(spec.getBatchingMaxMessages());
}
if (spec.getBatchingMaxBytes() > 0) {
builder.batchingMaxBytes(spec.getBatchingMaxBytes());
}
if (spec.getBatchBuilder() != null && !spec.getBatchBuilder().isEmpty()) {
builder.batchBuilder(spec.getBatchBuilder());
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ public static Function.ProducerSpec convertProducerConfigToProducerSpec(Producer
if (producerConf.getBatchBuilder() != null) {
builder.setBatchBuilder(producerConf.getBatchBuilder());
}
if (producerConf.getBatchingConfig() != null) {
builder.setBatchingSpec(BatchingUtils.convert(producerConf.getBatchingConfig()));
}
if (producerConf.getCompressionType() != null) {
builder.setCompressionType(convertFromCompressionType(producerConf.getCompressionType()));
} else {
Expand All @@ -546,6 +549,9 @@ public static ProducerConfig convertProducerSpecToProducerConfig(Function.Produc
if (spec.getBatchBuilder() != null) {
producerConfig.setBatchBuilder(spec.getBatchBuilder());
}
if (spec.hasBatchingSpec()) {
producerConfig.setBatchingConfig(BatchingUtils.convertFromSpec(spec.getBatchingSpec()));
}
producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
producerConfig.setCompressionType(convertFromFunctionDetailsCompressionType(spec.getCompressionType()));
return producerConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;

import static org.testng.Assert.*;
import org.apache.pulsar.common.functions.BatchingConfig;
import org.apache.pulsar.functions.proto.Function;
import org.testng.annotations.Test;

public class BatchingUtilsTest {

@Test
public void testConvert() {
BatchingConfig config = BatchingConfig.builder()
.enabled(true)
.batchingMaxPublishDelayMs(30)
.roundRobinRouterBatchingPartitionSwitchFrequency(10)
.batchingMaxMessages(1000)
.batchBuilder("DEFAULT")
.build();
Function.BatchingSpec spec = BatchingUtils.convert(config);
assertEquals(spec.getEnabled(), true);
assertEquals(spec.getBatchingMaxPublishDelayMs(), 30);
assertEquals(spec.getRoundRobinRouterBatchingPartitionSwitchFrequency(), 10);
assertEquals(spec.getBatchingMaxMessages(), 1000);
assertEquals(spec.getBatchingMaxBytes(), 0);
assertEquals(spec.getBatchBuilder(), "DEFAULT");
}

@Test
public void testConvertFromSpec() {
Function.BatchingSpec spec = Function.BatchingSpec.newBuilder()
.setEnabled(true)
.setBatchingMaxPublishDelayMs(30)
.setRoundRobinRouterBatchingPartitionSwitchFrequency(10)
.setBatchingMaxMessages(1000)
.setBatchBuilder("DEFAULT")
.build();
BatchingConfig config = BatchingUtils.convertFromSpec(spec);
assertEquals(config.isEnabled(), true);
assertEquals(config.getBatchingMaxPublishDelayMs().intValue(), 30);
assertEquals(config.getRoundRobinRouterBatchingPartitionSwitchFrequency().intValue(), 10);
assertEquals(config.getBatchingMaxMessages().intValue(), 1000);
assertEquals(config.getBatchingMaxBytes(), null);
assertEquals(config.getBatchBuilder(), "DEFAULT");
}

@Test
public void testConvertFromSpecFromNull() {
BatchingConfig config = BatchingUtils.convertFromSpec(null);
assertTrue(config.isEnabled());
assertEquals(config.getBatchingMaxPublishDelayMs().intValue(), 10);
}
}
Loading

0 comments on commit fad925f

Please sign in to comment.