Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ Breaking changes:
`api_version` parameter has been removed from the different clients (admin/consumer/producer)
(pr #1136 by @vmaurin)

New features:

* Simplify flexible versions in schema.
Defining an API request or response schemas that should support
flexible versions (KIP-482) is now achieved by setting `FLEXIBLE_VERSION` to True.
Tagged fields could be expressed with ("name", tag) instead of just a name.
(pr #1139 by @vmaurin)

Improved Documentation:

* Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times``
Expand Down
6 changes: 4 additions & 2 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,11 @@ def send(self, request, expect_response=True):
) from err

log.debug(
"Request to %s:%d %d: %s",
"Request to %s:%d %d: %s, %s",
self._host,
self._port,
correlation_id,
header,
request_struct,
)

Expand Down Expand Up @@ -565,10 +566,11 @@ def _handle_frame(self, resp):
if not fut.done():
response = resp_type.decode(resp)
log.debug(
"Response from %s:%d %d: %s",
"Response from %s:%d %d: %s, %s",
self._host,
self._port,
correlation_id,
response_header,
response,
)
fut.set_result(response)
Expand Down
4 changes: 2 additions & 2 deletions aiokafka/protocol/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
class AbstractType(Generic[T], metaclass=abc.ABCMeta):
@classmethod
@abc.abstractmethod
def encode(cls, value: T) -> bytes: ...
def encode(cls, value: T, flexible: bool) -> bytes: ...

Check notice

Code scanning / CodeQL

Statement has no effect Note

This statement has no effect.

@classmethod
@abc.abstractmethod
def decode(cls, data: BytesIO) -> T: ...
def decode(cls, data: BytesIO, flexible: bool) -> T: ...

Check notice

Code scanning / CodeQL

Statement has no effect Note

This statement has no effect.

@classmethod
def repr(cls, value: T) -> str:
Expand Down
135 changes: 30 additions & 105 deletions aiokafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
Array,
Boolean,
Bytes,
CompactArray,
CompactString,
Float64,
Int8,
Int16,
Int32,
Int64,
Schema,
String,
TaggedFields,
)


Expand Down Expand Up @@ -1453,53 +1450,48 @@ def build(
class AlterPartitionReassignmentsResponse_v0(Response):
API_KEY = 45
API_VERSION = 0
FLEXIBLE_VERSION = True
SCHEMA = Schema(
("throttle_time_ms", Int32),
("error_code", Int16),
("error_message", CompactString("utf-8")),
("error_message", String("utf-8")),
(
"responses",
CompactArray(
("name", CompactString("utf-8")),
Array(
("name", String("utf-8")),
(
"partitions",
CompactArray(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From code here it's not obvious if correct (compact) form will be used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #1139 (comment)

In the java client json, you can see they just say "it is an array" https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/AlterPartitionReassignmentsResponse.json#L36

Then, it is because the version is marked "flexible" that it is using the more compact serialization

Array(
("partition_index", Int32),
("error_code", Int16),
("error_message", CompactString("utf-8")),
("tags", TaggedFields),
("error_message", String("utf-8")),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)


class AlterPartitionReassignmentsRequest_v0(RequestStruct):
FLEXIBLE_VERSION = True
API_KEY = 45
API_VERSION = 0
FLEXIBLE_VERSION = True
RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0
SCHEMA = Schema(
("timeout_ms", Int32),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
Array(
("name", String("utf-8")),
(
"partitions",
CompactArray(
Array(
("partition_index", Int32),
("replicas", CompactArray(Int32)),
("tags", TaggedFields),
("replicas", Array(Int32)),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)


Expand All @@ -1516,44 +1508,40 @@ class AlterPartitionReassignmentsRequest(
def __init__(
self,
timeout_ms: int,
topics: list[tuple[str, tuple[int, list[int], TaggedFields], TaggedFields]],
tags: TaggedFields,
topics: list[tuple[str, tuple[int, list[int]]]],
):
self._timeout_ms = timeout_ms
self._topics = topics
self._tags = tags

def build(
self, request_struct_class: type[AlterPartitionReassignmentsRequestStruct]
) -> AlterPartitionReassignmentsRequestStruct:
return request_struct_class(self._timeout_ms, self._topics, self._tags)
return request_struct_class(self._timeout_ms, self._topics)


class ListPartitionReassignmentsResponse_v0(Response):
API_KEY = 46
API_VERSION = 0
FLEXIBLE_VERSION = True
SCHEMA = Schema(
("throttle_time_ms", Int32),
("error_code", Int16),
("error_message", CompactString("utf-8")),
("error_message", String("utf-8")),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
Array(
("name", String("utf-8")),
(
"partitions",
CompactArray(
Array(
("partition_index", Int32),
("replicas", CompactArray(Int32)),
("adding_replicas", CompactArray(Int32)),
("removing_replicas", CompactArray(Int32)),
("tags", TaggedFields),
("replicas", Array(Int32)),
("adding_replicas", Array(Int32)),
("removing_replicas", Array(Int32)),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)


Expand All @@ -1566,13 +1554,11 @@ class ListPartitionReassignmentsRequest_v0(RequestStruct):
("timeout_ms", Int32),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
("partition_index", CompactArray(Int32)),
("tags", TaggedFields),
Array(
("name", String("utf-8")),
("partition_index", Array(Int32)),
),
),
("tags", TaggedFields),
)


Expand All @@ -1589,17 +1575,15 @@ class ListPartitionReassignmentsRequest(
def __init__(
self,
timeout_ms: int,
topics: list[tuple[str, tuple[int, list[int], TaggedFields], TaggedFields]],
tags: TaggedFields,
topics: list[tuple[str, tuple[int, list[int]]]],
):
self._timeout_ms = timeout_ms
self._topics = topics
self._tags = tags

def build(
self, request_struct_class: type[ListPartitionReassignmentsRequestStruct]
) -> ListPartitionReassignmentsRequestStruct:
return request_struct_class(self._timeout_ms, self._topics, self._tags)
return request_struct_class(self._timeout_ms, self._topics)


class DeleteRecordsResponse_v0(Response):
Expand Down Expand Up @@ -1633,26 +1617,8 @@ class DeleteRecordsResponse_v1(Response):
class DeleteRecordsResponse_v2(Response):
API_KEY = 21
API_VERSION = 2
SCHEMA = Schema(
("throttle_time_ms", Int32),
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("low_watermark", Int64),
("error_code", Int16),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
)
FLEXIBLE_VERSION = True
SCHEMA = DeleteRecordsResponse_v0.SCHEMA


class DeleteRecordsRequest_v0(RequestStruct):
Expand Down Expand Up @@ -1689,25 +1655,7 @@ class DeleteRecordsRequest_v2(RequestStruct):
API_VERSION = 2
FLEXIBLE_VERSION = True
RESPONSE_TYPE = DeleteRecordsResponse_v2
SCHEMA = Schema(
(
"topics",
CompactArray(
("name", CompactString("utf-8")),
(
"partitions",
CompactArray(
("partition_index", Int32),
("offset", Int64),
("tags", TaggedFields),
),
),
("tags", TaggedFields),
),
),
("timeout_ms", Int32),
("tags", TaggedFields),
)
SCHEMA = DeleteRecordsRequest_v0.SCHEMA


DeleteRecordsRequestStruct: TypeAlias = (
Expand All @@ -1722,43 +1670,20 @@ def __init__(
self,
topics: Iterable[tuple[str, Iterable[tuple[int, int]]]],
timeout_ms: int,
tags: dict[int, bytes] | None = None,
) -> None:
self._topics = topics
self._timeout_ms = timeout_ms
self._tags = tags

def build(
self, request_struct_class: type[DeleteRecordsRequestStruct]
) -> DeleteRecordsRequestStruct:
if request_struct_class.API_VERSION < 2:
if self._tags is not None:
raise IncompatibleBrokerVersion(
"tags requires DeleteRecordsRequest >= v2"
)

return request_struct_class(
[
(
topic,
list(partitions),
)
for (topic, partitions) in self._topics
],
self._timeout_ms,
)
return request_struct_class(
[
(
topic,
[
(partition, before_offset, {})
for partition, before_offset in partitions
],
{},
list(partitions),
)
for (topic, partitions) in self._topics
],
self._timeout_ms,
self._tags or {},
)
Loading
Loading