Skip to content

Exception thrown if calling Seek earliest multiple times after each other #283

@henrikstengaard

Description

@henrikstengaard

I'm experiencing issues seeking in a pulsar topic, where I tried to call Seek multiple times for an IConsumer subscribed to the pulsar topic. If I call consumer.SeekAsync(MessageId.Earliest) twice after each other, an NotConnectedException exception with the message Not connected to broker.

Is it the expected behavior or I'm I doing something wrong?

Steps to reproduce:

First I started Pulsar with Docker as described here https://pulsar.apache.org/docs/4.0.x/standalone-docker/:

docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:4.0.0 bin/pulsar standalone

Then I ran the following code based on Simple.cs example found in the examples

public static class PulsarClientSeekIssue
{
    public static async Task Reproduce()
    {
        const string serviceUrl = "pulsar://localhost:6650";
        const string subscriptionName = "my-subscription";
        var topicName = $"my-topic-{DateTime.Now.Ticks}";

        var client = await new PulsarClientBuilder()
            .ServiceUrl(serviceUrl)
            .BuildAsync();

        var producer = await client.NewProducer()
            .Topic(topicName)
            .CreateAsync();

        var consumer = await client.NewConsumer()
            .Topic(topicName)
            .SubscriptionName(subscriptionName)
            .SubscribeAsync();

        for (var i = 0; i < 10; i++)
        {
            await Send(producer, $"Sent message {i + 1} from C# at '{DateTime.Now}'");
        }

        await consumer.SeekAsync(MessageId.Earliest);
        
        // Throws exception NotConnectedException "Not connected to broker"
        await consumer.SeekAsync(MessageId.Earliest);

        await Receive(consumer);
    }

    private static async Task<MessageId> Send(IProducer<byte[]> producer, string message)
    {
        var messageId = await producer.SendAsync(Encoding.UTF8.GetBytes(message));

        Console.WriteLine($"Sent message: '{message}'");
        Console.WriteLine($"MessageId: '{messageId}'");
        
        await Task.Delay(500);
        
        return messageId;
    }

    private static async Task<Message<byte[]>> Receive(IConsumer<byte[]> consumer)
    {
        var message = await consumer.ReceiveAsync();
        
        Console.WriteLine($"Received message");
        Console.WriteLine($"PublishTime: {message.PublishTime}");
        Console.WriteLine($"MessageId: '{message.MessageId}'");
        Console.WriteLine($"Data: {Encoding.UTF8.GetString(message.Data)}");
        
        return message;
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions