Skip to content

Commit db18a0e

Browse files
authored
Support amqp arguments (#35)
* Add safe root model handling for path searching in json codec * Add arguments to amqp channels * Format code
1 parent a4b09e4 commit db18a0e

File tree

5 files changed

+71
-6
lines changed

5 files changed

+71
-6
lines changed

src/asyncapi_python/contrib/wire/amqp/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class AmqpConfig:
2525
binding_type: AmqpBindingType = AmqpBindingType.QUEUE
2626
queue_properties: dict[str, Any] = field(default_factory=lambda: {})
2727
binding_arguments: dict[str, Any] = field(default_factory=lambda: {})
28+
arguments: dict[str, Any] = field(default_factory=lambda: {})
2829

2930
def to_producer_args(self) -> dict[str, Any]:
3031
"""Convert to AmqpProducer constructor arguments"""
@@ -34,6 +35,7 @@ def to_producer_args(self) -> dict[str, Any]:
3435
"exchange_type": self.exchange_type,
3536
"routing_key": self.routing_key,
3637
"queue_properties": self.queue_properties,
38+
"arguments": self.arguments,
3739
}
3840

3941
def to_consumer_args(self) -> dict[str, Any]:
@@ -46,4 +48,5 @@ def to_consumer_args(self) -> dict[str, Any]:
4648
"binding_type": self.binding_type,
4749
"queue_properties": self.queue_properties,
4850
"binding_arguments": self.binding_arguments,
51+
"arguments": self.arguments,
4952
}

src/asyncapi_python/contrib/wire/amqp/consumer.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def __init__(
3636
binding_type: AmqpBindingType = AmqpBindingType.QUEUE,
3737
queue_properties: dict[str, Any] | None = None,
3838
binding_arguments: dict[str, Any] | None = None,
39+
arguments: dict[str, Any] | None = None,
3940
):
4041
self._connection = connection
4142
self._queue_name = queue_name
@@ -45,6 +46,7 @@ def __init__(
4546
self._binding_type = binding_type
4647
self._queue_properties = queue_properties or {}
4748
self._binding_arguments = binding_arguments or {}
49+
self._arguments = arguments or {}
4850
self._channel: AbstractChannel | None = None
4951
self._queue: AbstractQueue | None = None
5052
self._exchange: AbstractExchange | None = None
@@ -67,6 +69,7 @@ async def start(self) -> None:
6769
durable=self._queue_properties.get("durable", True),
6870
exclusive=self._queue_properties.get("exclusive", False),
6971
auto_delete=self._queue_properties.get("auto_delete", False),
72+
arguments=self._arguments,
7073
)
7174

7275
# Simple queue binding pattern (default exchange)
@@ -76,6 +79,7 @@ async def start(self) -> None:
7679
durable=self._queue_properties.get("durable", True),
7780
exclusive=self._queue_properties.get("exclusive", False),
7881
auto_delete=self._queue_properties.get("auto_delete", False),
82+
arguments=self._arguments,
7983
)
8084

8185
# Routing key binding pattern (pub/sub with named exchange)
@@ -87,24 +91,28 @@ async def start(self) -> None:
8791
name=self._exchange_name,
8892
type=ExchangeType.DIRECT,
8993
durable=True,
94+
arguments=self._arguments,
9095
)
9196
case "topic":
9297
self._exchange = await self._channel.declare_exchange(
9398
name=self._exchange_name,
9499
type=ExchangeType.TOPIC,
95100
durable=True,
101+
arguments=self._arguments,
96102
)
97103
case "fanout":
98104
self._exchange = await self._channel.declare_exchange(
99105
name=self._exchange_name,
100106
type=ExchangeType.FANOUT,
101107
durable=True,
108+
arguments=self._arguments,
102109
)
103110
case "headers":
104111
self._exchange = await self._channel.declare_exchange(
105112
name=self._exchange_name,
106113
type=ExchangeType.HEADERS,
107114
durable=True,
115+
arguments=self._arguments,
108116
)
109117
case unknown_type:
110118
raise ValueError(f"Unsupported exchange type: {unknown_type}")
@@ -115,6 +123,7 @@ async def start(self) -> None:
115123
durable=self._queue_properties.get("durable", False),
116124
exclusive=self._queue_properties.get("exclusive", True),
117125
auto_delete=self._queue_properties.get("auto_delete", True),
126+
arguments=self._arguments,
118127
)
119128

120129
# Bind queue to exchange with routing key
@@ -129,24 +138,28 @@ async def start(self) -> None:
129138
name=self._exchange_name,
130139
type=ExchangeType.FANOUT,
131140
durable=True,
141+
arguments=self._arguments,
132142
)
133143
case "headers":
134144
self._exchange = await self._channel.declare_exchange(
135145
name=self._exchange_name,
136146
type=ExchangeType.HEADERS,
137147
durable=True,
148+
arguments=self._arguments,
138149
)
139150
case "topic":
140151
self._exchange = await self._channel.declare_exchange(
141152
name=self._exchange_name,
142153
type=ExchangeType.TOPIC,
143154
durable=True,
155+
arguments=self._arguments,
144156
)
145157
case "direct":
146158
self._exchange = await self._channel.declare_exchange(
147159
name=self._exchange_name,
148160
type=ExchangeType.DIRECT,
149161
durable=True,
162+
arguments=self._arguments,
150163
)
151164
case unknown_type:
152165
raise ValueError(f"Unsupported exchange type: {unknown_type}")
@@ -157,6 +170,7 @@ async def start(self) -> None:
157170
durable=self._queue_properties.get("durable", False),
158171
exclusive=self._queue_properties.get("exclusive", True),
159172
auto_delete=self._queue_properties.get("auto_delete", True),
173+
arguments=self._arguments,
160174
)
161175

162176
# Bind queue to exchange with binding arguments (for headers exchange)

src/asyncapi_python/contrib/wire/amqp/producer.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,15 @@ def __init__(
3131
exchange_type: str = "direct",
3232
routing_key: str = "",
3333
queue_properties: dict[str, Any] | None = None,
34+
arguments: dict[str, Any] | None = None,
3435
):
3536
self._connection = connection
3637
self._queue_name = queue_name
3738
self._exchange_name = exchange_name
3839
self._exchange_type = exchange_type
3940
self._routing_key = routing_key
4041
self._queue_properties = queue_properties or {}
42+
self._arguments = arguments or {}
4143
self._channel: AbstractChannel | None = None
4244
self._target_exchange: AbstractExchange | None = None
4345
self._started = False
@@ -61,27 +63,40 @@ async def start(self) -> None:
6163
durable=self._queue_properties.get("durable", True),
6264
exclusive=self._queue_properties.get("exclusive", False),
6365
auto_delete=self._queue_properties.get("auto_delete", False),
66+
arguments=self._arguments,
6467
)
6568

6669
# Named exchange patterns
6770
case (exchange_name, "direct"):
6871
self._target_exchange = await self._channel.declare_exchange(
69-
name=exchange_name, type=ExchangeType.DIRECT, durable=True
72+
name=exchange_name,
73+
type=ExchangeType.DIRECT,
74+
durable=True,
75+
arguments=self._arguments,
7076
)
7177

7278
case (exchange_name, "topic"):
7379
self._target_exchange = await self._channel.declare_exchange(
74-
name=exchange_name, type=ExchangeType.TOPIC, durable=True
80+
name=exchange_name,
81+
type=ExchangeType.TOPIC,
82+
durable=True,
83+
arguments=self._arguments,
7584
)
7685

7786
case (exchange_name, "fanout"):
7887
self._target_exchange = await self._channel.declare_exchange(
79-
name=exchange_name, type=ExchangeType.FANOUT, durable=True
88+
name=exchange_name,
89+
type=ExchangeType.FANOUT,
90+
durable=True,
91+
arguments=self._arguments,
8092
)
8193

8294
case (exchange_name, "headers"):
8395
self._target_exchange = await self._channel.declare_exchange(
84-
name=exchange_name, type=ExchangeType.HEADERS, durable=True
96+
name=exchange_name,
97+
type=ExchangeType.HEADERS,
98+
durable=True,
99+
arguments=self._arguments,
85100
)
86101

87102
case (exchange_name, unknown_type):

src/asyncapi_python/contrib/wire/amqp/resolver.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ def resolve_amqp_config(
116116
"exclusive": True,
117117
"auto_delete": True,
118118
},
119+
arguments={},
119120
)
120121

121122
# Reply channel with explicit address - check if direct queue or topic exchange
@@ -133,6 +134,7 @@ def resolve_amqp_config(
133134
"exclusive": True,
134135
"auto_delete": True,
135136
},
137+
arguments={},
136138
)
137139
else:
138140
# Topic-based reply pattern - shared exchange with filtering
@@ -143,6 +145,7 @@ def resolve_amqp_config(
143145
routing_key=app_id, # Filter messages by app_id
144146
binding_type=AmqpBindingType.REPLY,
145147
queue_properties={"durable": True, "exclusive": False},
148+
arguments={},
146149
)
147150

148151
# Reply channel with binding - defer to binding resolution
@@ -192,6 +195,7 @@ def resolve_amqp_config(
192195
routing_key=resolved_address,
193196
binding_type=AmqpBindingType.QUEUE,
194197
queue_properties={"durable": True, "exclusive": False},
198+
arguments={},
195199
)
196200

197201
# Operation name pattern (fallback)
@@ -204,6 +208,7 @@ def resolve_amqp_config(
204208
routing_key=op_name,
205209
binding_type=AmqpBindingType.QUEUE,
206210
queue_properties={"durable": True, "exclusive": False},
211+
arguments={},
207212
)
208213

209214
# No match - reject creation
@@ -245,20 +250,24 @@ def resolve_queue_binding(
245250
# Extract queue properties
246251
queue_config = getattr(binding, "queue", None)
247252
queue_properties = {"durable": True, "exclusive": False} # Defaults
253+
arguments: dict[str, Any] = {}
248254
if queue_config:
249255
if hasattr(queue_config, "durable"):
250256
queue_properties["durable"] = queue_config.durable
251257
if hasattr(queue_config, "exclusive"):
252258
queue_properties["exclusive"] = queue_config.exclusive
253259
if hasattr(queue_config, "auto_delete"):
254260
queue_properties["auto_delete"] = queue_config.auto_delete
261+
if hasattr(queue_config, "arguments") and queue_config.arguments:
262+
arguments = queue_config.arguments
255263

256264
return AmqpConfig(
257265
queue_name=queue_name,
258266
exchange_name="", # Queue bindings use default exchange
259267
routing_key=queue_name, # For default exchange, routing_key = queue_name
260268
binding_type=AmqpBindingType.QUEUE,
261269
queue_properties=queue_properties,
270+
arguments=arguments,
262271
)
263272

264273

@@ -303,6 +312,15 @@ def resolve_routing_key_binding(
303312
if exchange_config and hasattr(exchange_config, "type"):
304313
exchange_type = exchange_config.type
305314

315+
# Extract exchange arguments
316+
arguments: dict[str, Any] = {}
317+
if (
318+
exchange_config
319+
and hasattr(exchange_config, "arguments")
320+
and exchange_config.arguments
321+
):
322+
arguments = exchange_config.arguments
323+
306324
# Determine routing key - this is where wildcards are allowed
307325
match (getattr(binding, "routingKey", None), channel.address, operation_name):
308326
case (routing_key, _, _) if routing_key:
@@ -327,6 +345,7 @@ def resolve_routing_key_binding(
327345
routing_key=resolved_routing_key,
328346
binding_type=AmqpBindingType.ROUTING_KEY,
329347
queue_properties={"durable": False, "exclusive": True, "auto_delete": True},
348+
arguments=arguments,
330349
)
331350

332351

@@ -366,6 +385,15 @@ def resolve_exchange_binding(
366385
if exchange_config and hasattr(exchange_config, "type"):
367386
exchange_type = exchange_config.type
368387

388+
# Extract exchange arguments
389+
arguments: dict[str, Any] = {}
390+
if (
391+
exchange_config
392+
and hasattr(exchange_config, "arguments")
393+
and exchange_config.arguments
394+
):
395+
arguments = exchange_config.arguments
396+
369397
# Extract binding arguments for headers exchange from dataclass
370398
binding_args: dict[str, Any] = {}
371399
# Note: bindingKeys is not part of AmqpChannelBinding spec
@@ -379,4 +407,5 @@ def resolve_exchange_binding(
379407
binding_type=AmqpBindingType.EXCHANGE,
380408
queue_properties={"durable": False, "exclusive": True, "auto_delete": True},
381409
binding_arguments=binding_args,
410+
arguments=arguments,
382411
)

src/asyncapi_python/kernel/document/bindings.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ class AmqpExchange:
2626
durable: Optional[bool] = None
2727
auto_delete: Optional[bool] = None
2828
vhost: Optional[str] = None
29+
arguments: Optional[Dict[str, Any]] = None
2930

3031
def __repr__(self) -> str:
3132
"""Custom repr to handle enum properly for code generation."""
3233
from asyncapi_python.kernel.document.bindings import AmqpExchangeType
3334

3435
_ = AmqpExchangeType # Explicitly reference the import
35-
return f"spec.AmqpExchange(name={self.name!r}, type=spec.AmqpExchangeType.{self.type.name}, durable={self.durable!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r})"
36+
return f"spec.AmqpExchange(name={self.name!r}, type=spec.AmqpExchangeType.{self.type.name}, durable={self.durable!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r}, arguments={self.arguments!r})"
3637

3738

3839
@dataclass
@@ -44,10 +45,11 @@ class AmqpQueue:
4445
exclusive: Optional[bool] = None
4546
auto_delete: Optional[bool] = None
4647
vhost: Optional[str] = None
48+
arguments: Optional[Dict[str, Any]] = None
4749

4850
def __repr__(self) -> str:
4951
"""Custom repr for code generation."""
50-
return f"spec.AmqpQueue(name={self.name!r}, durable={self.durable!r}, exclusive={self.exclusive!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r})"
52+
return f"spec.AmqpQueue(name={self.name!r}, durable={self.durable!r}, exclusive={self.exclusive!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r}, arguments={self.arguments!r})"
5153

5254

5355
@dataclass
@@ -159,6 +161,7 @@ def create_amqp_binding_from_dict(binding_dict: Dict[str, Any]) -> AmqpChannelBi
159161
exclusive=queue_config.get("exclusive"),
160162
auto_delete=queue_config.get("auto_delete"),
161163
vhost=queue_config.get("vhost"),
164+
arguments=queue_config.get("arguments"),
162165
)
163166
elif binding_type == "routingKey" and "exchange" in binding_dict:
164167
exchange_config = binding_dict["exchange"]
@@ -176,6 +179,7 @@ def create_amqp_binding_from_dict(binding_dict: Dict[str, Any]) -> AmqpChannelBi
176179
durable=exchange_config.get("durable"),
177180
auto_delete=exchange_config.get("auto_delete"),
178181
vhost=exchange_config.get("vhost"),
182+
arguments=exchange_config.get("arguments"),
179183
)
180184

181185
return binding

0 commit comments

Comments
 (0)