Skip to content

Commit

Permalink
[KIP-848] User documentation (#4702)
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab authored May 7, 2024
1 parent a5c5641 commit 18bc849
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 5 deletions.
24 changes: 20 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

librdkafka v2.4.0 is a feature release:

* [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records) Augment ProduceResponse error messaging for specific culprit records (#4583).
* [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol.
**Early Access**: This should be used only for evaluation and must not be used in production. Features and contract of this KIP might change in future (#4610).
* [KIP-467](https://cwiki.apache.org/confluence/display/KAFKA/KIP-467%3A+Augment+ProduceResponse+error+messaging+for+specific+culprit+records): Augment ProduceResponse error messaging for specific culprit records (#4583).
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Upgrade OpenSSL to v3.0.12 (while building from source) with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt).
* Integration tests can be started in KRaft mode and run against any
Expand All @@ -12,9 +17,6 @@ librdkafka v2.4.0 is a feature release:
max period of 1 ms (#4671).
* Fixed a bug causing duplicate message consumption from a stale
fetch start offset in some particular cases (#4636)
* [KIP-516](https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers)
Continue partial implementation by adding a metadata cache by topic id
and updating the topic id corresponding to the partition name (#4676)
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).
Expand All @@ -34,6 +36,20 @@ librdkafka v2.4.0 is a feature release:
depending on the application logic (#4583).


## Early Access

### [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol): The Next Generation of the Consumer Rebalance Protocol
* With this new protocol the role of the Group Leader (a member) is removed and
the assignment is calculated by the Group Coordinator (a broker) and sent
to each member through heartbeats.

The feature is still _not production-ready_.
It's possible to try it in a non-production enviroment.

A [guide](INTRODUCTION.md#next-generation-of-the-consumer-group-protocol-kip-848) is available
with considerations and steps to follow to test it (#4610).


## Fixes

### General fixes
Expand Down
96 changes: 95 additions & 1 deletion INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ librdkafka also provides a native C++ interface.
- [Auto offset reset](#auto-offset-reset)
- [Consumer groups](#consumer-groups)
- [Static consumer groups](#static-consumer-groups)
- [Next generation of the consumer group protocol](#next-generation-of-the-consumer-group-protocol-kip-848)
- [Topics](#topics)
- [Unknown or unauthorized topics](#unknown-or-unauthorized-topics)
- [Topic metadata propagation for newly created topics](#topic-metadata-propagation-for-newly-created-topics)
Expand Down Expand Up @@ -1540,6 +1541,98 @@ the original fatal error code and reason.
To read more about static group membership, see [KIP-345](https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances).


### Next generation of the consumer group protocol: [KIP 848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)

Starting from librdkafka 2.4.0 the next generation consumer group rebalance protocol
defined in KIP 848 is introduced.

**Warning**
It's still in **Early Access** which means it's _not production-ready_,
given it's still under validation and lacking some needed features.
Features and their contract might change in future.

With this protocol the role of the Group Leader (a member) is removed and
the assignment is calculated by the Group Coordinator (a broker) and sent
to each member through heartbeats.

To test it, a Kafka cluster must be set up, in KRaft mode, and the new group
protocol enabled with the `group.coordinator.rebalance.protocols` property.
Broker version must be Apache Kafka 3.7.0 or newer. See Apache Kafka
[Release Notes](https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes).

Client side, it can be enabled by setting the new property `group.protocol=consumer`.
A second property named `group.remote.assignor` is added to choose desired
remote assignor.

**Available features**

- Subscription to one or more topics
- Rebalance callbacks (see contract changes)
- Static group membership
- Configure remote assignor
- Max poll interval is enforced
- Offline upgrade from an empty consumer group with committed offsets

**Future features**

- Regular expression support when subscribing
- AdminClient changes as described in the KIP

**Contract changes**

Along with the new feature there are some needed contract changes,
so the protocol will be enabled by default only with a librdkafka major release.

- Deprecated client configurations with the new protocol:
- `partition.assignment.strategy` replaced by `group.remote.assignor`
- `session.timeout.ms` replaced by broker configuration `group.consumer.session.timeout.ms`
- `heartbeat.interval.ms`, replaced by broker configuration `group.consumer.heartbeat.interval.ms`
- `group.protocol.type` which is not used in the new protocol

- Protocol rebalance is fully incremental, so the only allowed functions to
use in a rebalance callback will be `rd_kafka_incremental_assign` and
`rd_kafka_incremental_unassign`. Currently you can still use existing code
and the expected function to call is determined based on the chosen
`partition.assignment.strategy` but this will be removed in next
release.

When setting the `group.remote.assignor` property, it's already
required to use the incremental assign and unassign functions.
All assignors are sticky with new protocol, including the _range_ one, that wasn't.

- With a static group membership, if two members are using the same
`group.instance.id`, the one that joins the consumer group later will be
fenced, with the fatal `UNRELEASED_INSTANCE_ID` error. Before, it was the existing
member to be fenced. This was changed to avoid two members contending the
same id. It also means that any instance that crashes won't be automatically
replaced by a new instance until session times out and it's especially required
to check that consumers are being closed properly on shutdown. Ensuring that
no two instances with same `group.instance.id` are running at any time
is also important.

- Session timeout is remote only and, if the Coordinator isn't reachable
by a member, this will continue to fetch messages, even if it won't be able to
commit them. Otherwise, the member will be fenced as soon as it receives an
heartbeat response from the Coordinator.
With `classic` protocol, instead, member stops fetching when session timeout
expires on the client.

For the same reason, when closing or unsubscribing with auto-commit set,
the member will try to commit until a specific timeout has passed.
Currently the timeout is the same as the `classic` protocol and it corresponds
to the `session.timeout.ms`, but it will change before the feature
reaches a stable state.

- An `UNKNOWN_TOPIC_OR_PART` error isn't received anymore when a consumer is
subscribing to a topic that doesn't exist in local cache, as the consumer
is still subscribing to the topic and it could be created just after that.

- A consumer won't do a preliminary Metadata call that returns a
`TOPIC_AUTHORIZATION_FAILED`, as it's happening with group protocol `classic`.
Topic partitions will still be assigned to the member
by the Coordinator only if it's authorized to consume from the topic.


### Note on Batch consume APIs

Using multiple instances of `rd_kafka_consume_batch()` and/or `rd_kafka_consume_batch_queue()`
Expand Down Expand Up @@ -1951,7 +2044,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-559 - Make the Kafka Protocol Friendlier with L7 Proxies | 2.5.0 | Not supported |
| KIP-568 - Explicit rebalance triggering on the Consumer | 2.6.0 | Not supported |
| KIP-659 - Add metadata to DescribeConfigsResponse | 2.6.0 | Not supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | supported |
| KIP-580 - Exponential backoff for Kafka clients | 3.7.0 (WIP) | Supported |
| KIP-584 - Versioning scheme for features | WIP | Not supported |
| KIP-588 - Allow producers to recover gracefully from txn timeouts | 2.8.0 (WIP) | Not supported |
| KIP-601 - Configurable socket connection timeout | 2.7.0 | Supported |
Expand All @@ -1961,6 +2054,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-735 - Increase default consumer session timeout | 3.0.0 | Supported |
| KIP-768 - SASL/OAUTHBEARER OIDC support | 3.0 | Supported |
| KIP-881 - Rack-aware Partition Assignment for Kafka Consumers | 3.5.0 (WIP) | Supported |
| KIP-848 - The Next Generation of the Consumer Rebalance Protocol | 3.7.0 (EA) | Early Access |



Expand Down
7 changes: 7 additions & 0 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,13 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
rk->rk_conf.auto_commit_interval_ms * 1000ll,
rd_kafka_cgrp_offset_commit_tmr_cb, rkcg);

if (rkcg->rkcg_group_protocol == RD_KAFKA_GROUP_PROTOCOL_CONSUMER) {
rd_kafka_log(
rk, LOG_WARNING, "CGRP",
"KIP-848 Consumer Group Protocol is in Early Access "
"and MUST NOT be used in production");
}

return rkcg;
}

Expand Down

0 comments on commit 18bc849

Please sign in to comment.