Skip to content

Round robin balancer is not balancing topic-aware #1300

Open
@CubicrootXYZ

Description

@CubicrootXYZ

Describe the bug

The round robin balancer seems to not be balancing on a per topic basis and instead balances between partitions across all topics messages are enqueued for, ignoring different topics.

E.g. the partition is increased by 1 for each new message, no matter if it is send to the same topic as the previous one. In case of 4 messages written onto topic1, topic2, topic1, topic2 the partitions are chosen as follows:

  1. Topic 1, Partition 0
  2. Topic 2, Partition 1
  3. Topic 1, Partition 2
  4. Topic 2, Partition 3

Resulting in half of the partitions on each topic not receiving any messages. Instead I'd expect the following behavior:

  1. Topic 1, Partition 0
  2. Topic 2, Partition 0
  3. Topic 1, Partition 1
  4. Topic 2, Partition 1

Kafka Version

github.com/segmentio/kafka-go v0.4.47

Kafka version is unknown.

To Reproduce

A minimal working example:

        writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{kafkaAddr},
		Balancer: &kafka.RoundRobin{},
	})

	err := writer.WriteMessages(
		context.Background(),
		kafka.Message{
			Topic: topic1,
			Value: []byte("Topic 1, Message 1"),
		},
		kafka.Message{
			Topic: topic2,
			Value: []byte("Topic 2, Message 2"),
		},
		kafka.Message{
			Topic: topic1,
			Value: []byte("Topic 1, Message 3"),
		},
		kafka.Message{
			Topic: topic2,
			Value: []byte("Topic 2, Message 4"),
		},
	)
	require.NoError(t, err, "could not send kafka messages")

	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:     []string{kafkaAddr},
		GroupTopics: []string{topic1, topic2},
		GroupID:     "test",
	})

	partitionCount := map[string]map[int]int{}
	for range 4 {
		msg, err := reader.FetchMessage(context.Background())
		require.NoError(t, err, "could not read message")

		fmt.Printf("%s / Partition %d\n", string(msg.Value), msg.Partition)

		if partitionCount[msg.Topic] == nil {
			partitionCount[msg.Topic] = map[int]int{}
		}

		partitionCount[msg.Topic][msg.Partition]++
	}

	assert.Equal(t, 1, partitionCount[topic1][0], "expect 1 message for each partition")
	assert.Equal(t, 1, partitionCount[topic1][1], "expect 1 message for each partition")
	assert.Equal(t, 1, partitionCount[topic2][0], "expect 1 message for each partition")
	assert.Equal(t, 1, partitionCount[topic2][1], "expect 1 message for each partition")

Expected Behavior

I'd expect for both topics 1 message on partition 0 and 1.

Observed Behavior

There are 2 messages on topic 1, partition 0 and 2 messages on topic 2 partition 1.

Additional Context

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions