Skip to content

Commit 9289e11

Browse files
author
Vincent Maurin
committed
Simplify flexible versions
The flexible versions is a protocol specificity for newer versions of the API. When an API is flexible, it is using more compact structures and also allow additional "dynamic" fields that could be added without the need to introduce a new API versions. This commit move the flexible versions support to the protocol layer, so it is more transparent and easy when defining Struct classes and schemas. When defining the schema, we can specify a tagged field with a tuple containing the field name and the field tag.
1 parent 7d0bd25 commit 9289e11

File tree

10 files changed

+302
-323
lines changed

10 files changed

+302
-323
lines changed

CHANGES.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ Breaking changes:
1111
`api_version` parameter has been removed from the different clients (admin/consumer/producer)
1212
(pr #1136 by @vmaurin)
1313

14+
New features:
15+
16+
* Simplify flexible versions in schema.
17+
Defining an API request or response schemas that should support
18+
flexible versions (KIP-482) is now achieved by setting `FLEXIBLE_VERSION` to True.
19+
Tagged fields could be expressed with ("name", tag) instead of just a name.
20+
1421
Improved Documentation:
1522

1623
* Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times``

aiokafka/protocol/abstract.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
class AbstractType(Generic[T], metaclass=abc.ABCMeta):
99
@classmethod
1010
@abc.abstractmethod
11-
def encode(cls, value: T) -> bytes: ...
11+
def encode(cls, value: T, flexible: bool) -> bytes: ...
1212

1313
@classmethod
1414
@abc.abstractmethod
15-
def decode(cls, data: BytesIO) -> T: ...
15+
def decode(cls, data: BytesIO, flexible: bool) -> T: ...
1616

1717
@classmethod
1818
def repr(cls, value: T) -> str:

aiokafka/protocol/admin.py

Lines changed: 30 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,13 @@
88
Array,
99
Boolean,
1010
Bytes,
11-
CompactArray,
12-
CompactString,
1311
Float64,
1412
Int8,
1513
Int16,
1614
Int32,
1715
Int64,
1816
Schema,
1917
String,
20-
TaggedFields,
2118
)
2219

2320

@@ -1453,53 +1450,48 @@ def build(
14531450
class AlterPartitionReassignmentsResponse_v0(Response):
14541451
API_KEY = 45
14551452
API_VERSION = 0
1453+
FLEXIBLE_VERSION = True
14561454
SCHEMA = Schema(
14571455
("throttle_time_ms", Int32),
14581456
("error_code", Int16),
1459-
("error_message", CompactString("utf-8")),
1457+
("error_message", String("utf-8")),
14601458
(
14611459
"responses",
1462-
CompactArray(
1463-
("name", CompactString("utf-8")),
1460+
Array(
1461+
("name", String("utf-8")),
14641462
(
14651463
"partitions",
1466-
CompactArray(
1464+
Array(
14671465
("partition_index", Int32),
14681466
("error_code", Int16),
1469-
("error_message", CompactString("utf-8")),
1470-
("tags", TaggedFields),
1467+
("error_message", String("utf-8")),
14711468
),
14721469
),
1473-
("tags", TaggedFields),
14741470
),
14751471
),
1476-
("tags", TaggedFields),
14771472
)
14781473

14791474

14801475
class AlterPartitionReassignmentsRequest_v0(RequestStruct):
1481-
FLEXIBLE_VERSION = True
14821476
API_KEY = 45
14831477
API_VERSION = 0
1478+
FLEXIBLE_VERSION = True
14841479
RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0
14851480
SCHEMA = Schema(
14861481
("timeout_ms", Int32),
14871482
(
14881483
"topics",
1489-
CompactArray(
1490-
("name", CompactString("utf-8")),
1484+
Array(
1485+
("name", String("utf-8")),
14911486
(
14921487
"partitions",
1493-
CompactArray(
1488+
Array(
14941489
("partition_index", Int32),
1495-
("replicas", CompactArray(Int32)),
1496-
("tags", TaggedFields),
1490+
("replicas", Array(Int32)),
14971491
),
14981492
),
1499-
("tags", TaggedFields),
15001493
),
15011494
),
1502-
("tags", TaggedFields),
15031495
)
15041496

15051497

@@ -1516,44 +1508,40 @@ class AlterPartitionReassignmentsRequest(
15161508
def __init__(
15171509
self,
15181510
timeout_ms: int,
1519-
topics: list[tuple[str, tuple[int, list[int], TaggedFields], TaggedFields]],
1520-
tags: TaggedFields,
1511+
topics: list[tuple[str, tuple[int, list[int]]]],
15211512
):
15221513
self._timeout_ms = timeout_ms
15231514
self._topics = topics
1524-
self._tags = tags
15251515

15261516
def build(
15271517
self, request_struct_class: type[AlterPartitionReassignmentsRequestStruct]
15281518
) -> AlterPartitionReassignmentsRequestStruct:
1529-
return request_struct_class(self._timeout_ms, self._topics, self._tags)
1519+
return request_struct_class(self._timeout_ms, self._topics)
15301520

15311521

15321522
class ListPartitionReassignmentsResponse_v0(Response):
15331523
API_KEY = 46
15341524
API_VERSION = 0
1525+
FLEXIBLE_VERSION = True
15351526
SCHEMA = Schema(
15361527
("throttle_time_ms", Int32),
15371528
("error_code", Int16),
1538-
("error_message", CompactString("utf-8")),
1529+
("error_message", String("utf-8")),
15391530
(
15401531
"topics",
1541-
CompactArray(
1542-
("name", CompactString("utf-8")),
1532+
Array(
1533+
("name", String("utf-8")),
15431534
(
15441535
"partitions",
1545-
CompactArray(
1536+
Array(
15461537
("partition_index", Int32),
1547-
("replicas", CompactArray(Int32)),
1548-
("adding_replicas", CompactArray(Int32)),
1549-
("removing_replicas", CompactArray(Int32)),
1550-
("tags", TaggedFields),
1538+
("replicas", Array(Int32)),
1539+
("adding_replicas", Array(Int32)),
1540+
("removing_replicas", Array(Int32)),
15511541
),
15521542
),
1553-
("tags", TaggedFields),
15541543
),
15551544
),
1556-
("tags", TaggedFields),
15571545
)
15581546

15591547

@@ -1566,13 +1554,11 @@ class ListPartitionReassignmentsRequest_v0(RequestStruct):
15661554
("timeout_ms", Int32),
15671555
(
15681556
"topics",
1569-
CompactArray(
1570-
("name", CompactString("utf-8")),
1571-
("partition_index", CompactArray(Int32)),
1572-
("tags", TaggedFields),
1557+
Array(
1558+
("name", String("utf-8")),
1559+
("partition_index", Array(Int32)),
15731560
),
15741561
),
1575-
("tags", TaggedFields),
15761562
)
15771563

15781564

@@ -1589,17 +1575,15 @@ class ListPartitionReassignmentsRequest(
15891575
def __init__(
15901576
self,
15911577
timeout_ms: int,
1592-
topics: list[tuple[str, tuple[int, list[int], TaggedFields], TaggedFields]],
1593-
tags: TaggedFields,
1578+
topics: list[tuple[str, tuple[int, list[int]]]],
15941579
):
15951580
self._timeout_ms = timeout_ms
15961581
self._topics = topics
1597-
self._tags = tags
15981582

15991583
def build(
16001584
self, request_struct_class: type[ListPartitionReassignmentsRequestStruct]
16011585
) -> ListPartitionReassignmentsRequestStruct:
1602-
return request_struct_class(self._timeout_ms, self._topics, self._tags)
1586+
return request_struct_class(self._timeout_ms, self._topics)
16031587

16041588

16051589
class DeleteRecordsResponse_v0(Response):
@@ -1633,26 +1617,8 @@ class DeleteRecordsResponse_v1(Response):
16331617
class DeleteRecordsResponse_v2(Response):
16341618
API_KEY = 21
16351619
API_VERSION = 2
1636-
SCHEMA = Schema(
1637-
("throttle_time_ms", Int32),
1638-
(
1639-
"topics",
1640-
CompactArray(
1641-
("name", CompactString("utf-8")),
1642-
(
1643-
"partitions",
1644-
CompactArray(
1645-
("partition_index", Int32),
1646-
("low_watermark", Int64),
1647-
("error_code", Int16),
1648-
("tags", TaggedFields),
1649-
),
1650-
),
1651-
("tags", TaggedFields),
1652-
),
1653-
),
1654-
("tags", TaggedFields),
1655-
)
1620+
FLEXIBLE_VERSION = True
1621+
SCHEMA = DeleteRecordsResponse_v0.SCHEMA
16561622

16571623

16581624
class DeleteRecordsRequest_v0(RequestStruct):
@@ -1689,25 +1655,7 @@ class DeleteRecordsRequest_v2(RequestStruct):
16891655
API_VERSION = 2
16901656
FLEXIBLE_VERSION = True
16911657
RESPONSE_TYPE = DeleteRecordsResponse_v2
1692-
SCHEMA = Schema(
1693-
(
1694-
"topics",
1695-
CompactArray(
1696-
("name", CompactString("utf-8")),
1697-
(
1698-
"partitions",
1699-
CompactArray(
1700-
("partition_index", Int32),
1701-
("offset", Int64),
1702-
("tags", TaggedFields),
1703-
),
1704-
),
1705-
("tags", TaggedFields),
1706-
),
1707-
),
1708-
("timeout_ms", Int32),
1709-
("tags", TaggedFields),
1710-
)
1658+
SCHEMA = DeleteRecordsResponse_v0.SCHEMA
17111659

17121660

17131661
DeleteRecordsRequestStruct: TypeAlias = (
@@ -1722,43 +1670,20 @@ def __init__(
17221670
self,
17231671
topics: Iterable[tuple[str, Iterable[tuple[int, int]]]],
17241672
timeout_ms: int,
1725-
tags: dict[int, bytes] | None = None,
17261673
) -> None:
17271674
self._topics = topics
17281675
self._timeout_ms = timeout_ms
1729-
self._tags = tags
17301676

17311677
def build(
17321678
self, request_struct_class: type[DeleteRecordsRequestStruct]
17331679
) -> DeleteRecordsRequestStruct:
1734-
if request_struct_class.API_VERSION < 2:
1735-
if self._tags is not None:
1736-
raise IncompatibleBrokerVersion(
1737-
"tags requires DeleteRecordsRequest >= v2"
1738-
)
1739-
1740-
return request_struct_class(
1741-
[
1742-
(
1743-
topic,
1744-
list(partitions),
1745-
)
1746-
for (topic, partitions) in self._topics
1747-
],
1748-
self._timeout_ms,
1749-
)
17501680
return request_struct_class(
17511681
[
17521682
(
17531683
topic,
1754-
[
1755-
(partition, before_offset, {})
1756-
for partition, before_offset in partitions
1757-
],
1758-
{},
1684+
list(partitions),
17591685
)
17601686
for (topic, partitions) in self._topics
17611687
],
17621688
self._timeout_ms,
1763-
self._tags or {},
17641689
)

aiokafka/protocol/api.py

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from aiokafka.errors import IncompatibleBrokerVersion
99

1010
from .struct import Struct
11-
from .types import Array, Int16, Int32, Schema, String, TaggedFields
11+
from .types import Array, Int16, Int32, Schema, String
1212

1313

1414
class RequestHeader_v0(Struct):
@@ -31,24 +31,17 @@ def __init__(
3131

3232

3333
class RequestHeader_v1(Struct):
34-
# Flexible response / request headers end in field buffer
35-
SCHEMA = Schema(
36-
("api_key", Int16),
37-
("api_version", Int16),
38-
("correlation_id", Int32),
39-
("client_id", String("utf-8")),
40-
("tags", TaggedFields),
41-
)
34+
SCHEMA = RequestHeader_v0.SCHEMA
35+
FLEXIBLE_VERSION = True
4236

4337
def __init__(
4438
self,
4539
request: RequestStruct,
4640
correlation_id: int = 0,
4741
client_id: str = "aiokafka",
48-
tags: dict[int, bytes] | None = None,
4942
):
5043
super().__init__(
51-
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
44+
request.API_KEY, request.API_VERSION, correlation_id, client_id
5245
)
5346

5447

@@ -61,8 +54,8 @@ class ResponseHeader_v0(Struct):
6154
class ResponseHeader_v1(Struct):
6255
SCHEMA = Schema(
6356
("correlation_id", Int32),
64-
("tags", TaggedFields),
6557
)
58+
FLEXIBLE_VERSION = True
6659

6760

6861
T = TypeVar("T", bound="RequestStruct")
@@ -150,7 +143,7 @@ class RequestStruct(Struct, metaclass=abc.ABCMeta):
150143
Attributes
151144
----------
152145
FLEXIBLE_VERSION : bool
153-
Use request header with flexible tags
146+
Support flexible/compact format
154147
API_KEY : int
155148
The unique API key identifying the request.
156149
API_VERSION : int
@@ -161,11 +154,9 @@ class RequestStruct(Struct, metaclass=abc.ABCMeta):
161154
An instance of Schema() representing the request structure.
162155
"""
163156

164-
FLEXIBLE_VERSION: ClassVar[bool] = False
165157
API_KEY: ClassVar[int]
166158
API_VERSION: ClassVar[int]
167159
RESPONSE_TYPE: ClassVar[type[Response]]
168-
SCHEMA: ClassVar[Schema]
169160

170161
def __init_subclass__(cls) -> None:
171162
super().__init_subclass__()
@@ -203,20 +194,8 @@ def parse_response_header(
203194

204195

205196
class Response(Struct, metaclass=abc.ABCMeta):
206-
@property
207-
@abc.abstractmethod
208-
def API_KEY(self) -> int:
209-
"""Integer identifier for api request/response"""
210-
211-
@property
212-
@abc.abstractmethod
213-
def API_VERSION(self) -> int:
214-
"""Integer of api request/response version"""
215-
216-
@property
217-
@abc.abstractmethod
218-
def SCHEMA(self) -> Schema:
219-
"""An instance of Schema() representing the response structure"""
197+
API_KEY: ClassVar[int]
198+
API_VERSION: ClassVar[int]
220199

221200
def to_object(self) -> dict[str, Any]:
222201
return _to_object(self.SCHEMA, self)

0 commit comments

Comments
 (0)