Skip to content

Commit 3372b3c

Browse files
authored
Fix producer parameter validation (#33)
* Fix edge case * Add tests * Move parameter count validation from amqp to receiver plane * Increment version
1 parent 9aaee89 commit 3372b3c

File tree

6 files changed

+18
-172
lines changed

6 files changed

+18
-172
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "asyncapi-python"
3-
version = "0.3.0rc6"
3+
version = "0.3.0rc8"
44
license = { text = "Apache-2.0" }
55
description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications."
66
authors = [{ name = "Yaroslav Petrov", email = "[email protected]" }]

src/asyncapi_python/contrib/wire/amqp/resolver.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@
66
from asyncapi_python.kernel.document.bindings import AmqpChannelBinding
77
from asyncapi_python.kernel.document.channel import Channel
88
from asyncapi_python.kernel.wire import EndpointParams
9-
from asyncapi_python.kernel.wire.utils import (
10-
substitute_parameters,
11-
validate_parameters_strict,
12-
)
9+
from asyncapi_python.kernel.wire.utils import substitute_parameters
1310

1411
from .config import AmqpBindingType, AmqpConfig
1512

@@ -186,8 +183,7 @@ def resolve_amqp_config(
186183

187184
# Channel address pattern (with parameter substitution)
188185
case (False, None, address, _) if address:
189-
# Strict validation for implicit queue binding
190-
validate_parameters_strict(channel, param_values)
186+
# Validate no wildcards for implicit queue binding
191187
_validate_no_wildcards_in_queue(param_values)
192188
resolved_address = substitute_parameters(address, param_values)
193189
return AmqpConfig(
@@ -200,8 +196,7 @@ def resolve_amqp_config(
200196

201197
# Operation name pattern (fallback)
202198
case (False, None, None, op_name) if op_name:
203-
# Strict validation for implicit queue binding
204-
validate_parameters_strict(channel, param_values)
199+
# Validate no wildcards for implicit queue binding
205200
_validate_no_wildcards_in_queue(param_values)
206201
return AmqpConfig(
207202
queue_name=op_name,
@@ -228,13 +223,9 @@ def resolve_queue_binding(
228223
"""Resolve AMQP queue binding configuration
229224
230225
Queue bindings require:
231-
- All channel parameters must be provided (strict validation)
232226
- No wildcards allowed in parameter values
233227
"""
234228

235-
# Strict validation: all parameters required, exact match
236-
validate_parameters_strict(channel, param_values)
237-
238229
# Validate no wildcards in queue binding parameters
239230
_validate_no_wildcards_in_queue(param_values)
240231

@@ -280,14 +271,10 @@ def resolve_routing_key_binding(
280271
"""Resolve AMQP routing key binding configuration for pub/sub patterns
281272
282273
For routing key bindings:
283-
- All channel-defined parameters must be provided (strict validation)
284274
- Parameter values can explicitly contain wildcards ('*' or '#')
285275
- Wildcards are allowed for topic exchange pattern matching
286276
"""
287277

288-
# Strict validation: all parameters required, exact match
289-
validate_parameters_strict(channel, param_values)
290-
291278
# Determine exchange name and type
292279
# For exchange name, we need concrete values (no wildcards)
293280
# If param_values has placeholders, use them; otherwise use literal exchange name

src/asyncapi_python/kernel/endpoint/rpc_server.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing_extensions import Unpack
55

66
from asyncapi_python.kernel.wire import Consumer, Producer
7+
from asyncapi_python.kernel.wire.utils import validate_parameters_strict
78

89
from ..exceptions import Reject
910
from ..typing import (
@@ -63,6 +64,11 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None:
6364
if not self._reply_codecs:
6465
raise RuntimeError("RPC server operation has no reply messages defined")
6566

67+
# Validate subscription parameters before creating consumer
68+
validate_parameters_strict(
69+
self._operation.channel, self._subscription_parameters
70+
)
71+
6672
# Create consumer for receiving requests
6773
self._consumer = await self._wire.create_consumer(
6874
channel=self._operation.channel,

src/asyncapi_python/kernel/endpoint/subscriber.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing_extensions import Unpack
55

66
from asyncapi_python.kernel.wire import Consumer
7+
from asyncapi_python.kernel.wire.utils import validate_parameters_strict
78

89
from ..exceptions import Reject
910
from ..typing import BatchConfig, BatchConsumer, Handler, IncomingMessage, T_Input
@@ -46,6 +47,11 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None:
4647
f"Use @{self._operation.key} decorator to register a handler function."
4748
)
4849

50+
# Validate subscription parameters before creating consumer
51+
validate_parameters_strict(
52+
self._operation.channel, self._subscription_parameters
53+
)
54+
4955
# Create consumer from wire factory
5056
self._consumer = await self._wire.create_consumer(
5157
channel=self._operation.channel,
@@ -331,3 +337,4 @@ async def process_batch():
331337
# If processing remaining batch fails, just nack all and continue
332338
for _, wire_message in batch:
333339
await wire_message.nack()
340+
await wire_message.nack()

tests/core/wire/test_amqp_resolver.py

Lines changed: 0 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -196,157 +196,3 @@ def test_resolve_channel_address_without_binding_accepts_concrete_params():
196196

197197
# Verify the address was substituted with concrete value
198198
assert config.queue_name == "task.high"
199-
200-
201-
def test_resolve_amqp_config_rejects_missing_parameters():
202-
"""Resolver should validate all required parameters are provided."""
203-
from asyncapi_python.kernel.document.channel import AddressParameter
204-
205-
# Create channel with 2 required parameters
206-
channel = create_test_channel(
207-
address="weather.{location}.{severity}",
208-
binding=AmqpChannelBinding(type="queue"),
209-
)
210-
# Add parameter definitions to channel
211-
channel = Channel(
212-
key=channel.key,
213-
address=channel.address,
214-
title=channel.title,
215-
summary=channel.summary,
216-
description=channel.description,
217-
servers=channel.servers,
218-
messages=channel.messages,
219-
parameters={
220-
"location": AddressParameter(
221-
key="location",
222-
description="Geographic location",
223-
location=None,
224-
),
225-
"severity": AddressParameter(
226-
key="severity",
227-
description="Alert severity",
228-
location=None,
229-
),
230-
},
231-
tags=channel.tags,
232-
external_docs=channel.external_docs,
233-
bindings=channel.bindings,
234-
)
235-
236-
params: EndpointParams = {
237-
"channel": channel,
238-
"parameters": {"location": "NYC"}, # Missing severity
239-
"op_bindings": None,
240-
"is_reply": False,
241-
}
242-
243-
with pytest.raises(ValueError) as exc_info:
244-
resolve_amqp_config(params, "test_op", "test_app")
245-
246-
error_msg = str(exc_info.value)
247-
assert "Missing required parameters" in error_msg
248-
assert "severity" in error_msg
249-
250-
251-
def test_resolve_amqp_config_rejects_extra_parameters():
252-
"""Resolver should reject extra parameters not defined in channel."""
253-
from asyncapi_python.kernel.document.channel import AddressParameter
254-
255-
# Create channel with 1 required parameter
256-
channel = create_test_channel(
257-
address="weather.{location}",
258-
binding=AmqpChannelBinding(type="queue"),
259-
)
260-
# Add parameter definition to channel
261-
channel = Channel(
262-
key=channel.key,
263-
address=channel.address,
264-
title=channel.title,
265-
summary=channel.summary,
266-
description=channel.description,
267-
servers=channel.servers,
268-
messages=channel.messages,
269-
parameters={
270-
"location": AddressParameter(
271-
key="location",
272-
description="Geographic location",
273-
location=None,
274-
),
275-
},
276-
tags=channel.tags,
277-
external_docs=channel.external_docs,
278-
bindings=channel.bindings,
279-
)
280-
281-
params: EndpointParams = {
282-
"channel": channel,
283-
"parameters": {
284-
"location": "NYC",
285-
"severity": "high", # Extra parameter
286-
},
287-
"op_bindings": None,
288-
"is_reply": False,
289-
}
290-
291-
with pytest.raises(ValueError) as exc_info:
292-
resolve_amqp_config(params, "test_op", "test_app")
293-
294-
error_msg = str(exc_info.value)
295-
assert "Unexpected parameters" in error_msg
296-
assert "severity" in error_msg
297-
298-
299-
def test_resolve_amqp_config_routing_key_rejects_missing_parameters():
300-
"""Routing key bindings should also validate all parameters are provided."""
301-
from asyncapi_python.kernel.document.channel import AddressParameter
302-
303-
# Create channel with routingKey binding
304-
channel = create_test_channel(
305-
address="weather.{location}.{severity}",
306-
binding=AmqpChannelBinding(
307-
type="routingKey",
308-
exchange=AmqpExchange(
309-
name="weather_exchange",
310-
type=AmqpExchangeType.TOPIC,
311-
),
312-
),
313-
)
314-
# Add parameter definitions
315-
channel = Channel(
316-
key=channel.key,
317-
address=channel.address,
318-
title=channel.title,
319-
summary=channel.summary,
320-
description=channel.description,
321-
servers=channel.servers,
322-
messages=channel.messages,
323-
parameters={
324-
"location": AddressParameter(
325-
key="location",
326-
description="Geographic location",
327-
location=None,
328-
),
329-
"severity": AddressParameter(
330-
key="severity",
331-
description="Alert severity",
332-
location=None,
333-
),
334-
},
335-
tags=channel.tags,
336-
external_docs=channel.external_docs,
337-
bindings=channel.bindings,
338-
)
339-
340-
params: EndpointParams = {
341-
"channel": channel,
342-
"parameters": {"location": "NYC"}, # Missing severity
343-
"op_bindings": None,
344-
"is_reply": False,
345-
}
346-
347-
with pytest.raises(ValueError) as exc_info:
348-
resolve_amqp_config(params, "test_op", "test_app")
349-
350-
error_msg = str(exc_info.value)
351-
assert "Missing required parameters" in error_msg
352-
assert "severity" in error_msg

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)