Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

[BUG] Cumulative Acknowledgement is not happening in the flink-connector-pulsar #610

@anoop-khandelwal

Description

@anoop-khandelwal

Describe the bug
We are using the below libaries-
Flink-1.15.0
Pulsar- 2.8.2
flink-connector-pulsar=1.15.0

To Reproduce
Steps to reproduce the behavior:

  1. TestJob
    `public class TestJob {
    public static void main(String[] args) {
    String authParams = String.format("token:%s", PULSAR_CLIENT_AUTH_TOKEN);
    String topicPattern = "persistent://a/b/test";
    List topics = new ArrayList();
    topics.add(topicPattern);

     Properties properties = new Properties();
     properties.setProperty(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME.key(), AuthenticationToken.class.getName());
     properties.setProperty(PulsarOptions.PULSAR_AUTH_PARAMS.key(), authParams);
     properties.setProperty(PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH.key(),PULSAR_CERT_PATH);
     properties.setProperty(PulsarOptions.PULSAR_SERVICE_URL.key(), PULSAR_HOST);
     properties.setProperty(PulsarOptions.PULSAR_CONNECT_TIMEOUT.key(),"600000");
     properties.setProperty(PulsarOptions.PULSAR_READ_TIMEOUT.key(),"600000");
     properties.setProperty(PulsarOptions.PULSAR_REQUEST_TIMEOUT.key(),"600000");
     properties.setProperty(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE.key(),Boolean.TRUE.toString());
    
     PulsarSource<String> src = PulsarSource.builder()
             .setServiceUrl(PULSAR_HOST)
             .setAdminUrl(PULSAR_ADMIN_HOST)
             .setProperties(properties)
             .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,10000000L)
             .setStartCursor(StartCursor.earliest())
             .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
             .setSubscriptionName("test-subscription-local")
             .setSubscriptionType(SubscriptionType.Failover)
             .setConsumerName(String.format("test-consumer-local"))
             .setTopics(topics).build();
     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     env.getConfig().setAutoWatermarkInterval(0L);
     env.addDefaultKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
     String sourceName = String.format("pulsar-source-local");
     DataStream<String> stream =  env.fromSource(src, WatermarkStrategy.noWatermarks(),sourceName)
             .setParallelism(1)
             .uid(sourceName)
             .name(sourceName);
    
     stream
             .process(new TestProcessFunction()).setParallelism(1)
             .uid(String.format("test-job-pf"))
             .name(String.format("test-job-pf"))
             .addSink(new TestSink()).setParallelism(1)
             .uid(String.format("sink-job"))
             .name(String.format("sink-job"));
    

    }
    }`

Messages = M1.....M10
Expected behavior
Upon the acknowledgment, messages should not be appearing again.
Upon job restart after ensuring it has processed all the messages, the messages keep coming back.
We saw that the cumulativeAcknowledgement() function is invoked all the time with or without checkpoint enabled.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions