Skip to content

Commit a646a04

Browse files
authored
Incorrect reply routing (#25)
* Fix incorrect dynamic reply queue routing * Update lockfile * Fix pyright * Update resolver logic * Move service_name to Application from wire * Drop service name param from amqp wire * Move service name to endpoint params * Add default timeout
1 parent 85c0fc6 commit a646a04

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+584
-268
lines changed

src/asyncapi_python/contrib/codec/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@
22

33
from .registry import CodecRegistry
44

5-
65
__all__ = ["CodecRegistry"]

src/asyncapi_python/contrib/codec/json.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import json
2-
from typing import Type, ClassVar
32
from types import ModuleType
3+
from typing import ClassVar, Type
44

55
from pydantic import BaseModel, ValidationError
66

src/asyncapi_python/contrib/codec/registry.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
from typing import ClassVar, Any
21
from types import ModuleType
3-
from asyncapi_python.kernel.codec import CodecFactory, Codec
2+
from typing import Any, ClassVar
3+
4+
from asyncapi_python.kernel.codec import Codec, CodecFactory
45
from asyncapi_python.kernel.document.message import Message
6+
57
from .json import JsonCodecFactory
68

79

src/asyncapi_python/contrib/wire/amqp/consumer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
try:
77
from aio_pika import ExchangeType # type: ignore[import-not-found]
88
from aio_pika.abc import ( # type: ignore[import-not-found]
9-
AbstractConnection,
109
AbstractChannel,
11-
AbstractQueue,
10+
AbstractConnection,
1211
AbstractExchange,
12+
AbstractQueue,
1313
)
1414
except ImportError as e:
1515
raise ImportError(

src/asyncapi_python/contrib/wire/amqp/factory.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""AMQP wire factory implementation"""
22

33
import secrets
4-
from typing import Optional, Callable, Any, cast
4+
from typing import Any, Callable, Optional, cast
5+
56
from typing_extensions import Unpack
67

78
try:
@@ -13,11 +14,11 @@
1314
) from e
1415

1516
from asyncapi_python.kernel.wire import AbstractWireFactory, EndpointParams
16-
from asyncapi_python.kernel.wire.typing import Producer, Consumer
17+
from asyncapi_python.kernel.wire.typing import Consumer, Producer
1718

18-
from .message import AmqpWireMessage, AmqpIncomingMessage
19-
from .producer import AmqpProducer
2019
from .consumer import AmqpConsumer
20+
from .message import AmqpIncomingMessage, AmqpWireMessage
21+
from .producer import AmqpProducer
2122
from .resolver import resolve_amqp_config
2223

2324

@@ -31,7 +32,6 @@ class AmqpWire(AbstractWireFactory[AmqpWireMessage, AmqpIncomingMessage]):
3132
def __init__(
3233
self,
3334
connection_url: str,
34-
service_name: str = "app",
3535
robust: bool = False,
3636
reconnect_interval: float = 1.0,
3737
max_reconnect_interval: float = 60.0,
@@ -45,7 +45,6 @@ def __init__(
4545
4646
Args:
4747
connection_url: AMQP connection URL
48-
service_name: Service name prefix for app_id
4948
robust: Enable robust connection with auto-reconnect (default: False)
5049
reconnect_interval: Initial reconnect interval in seconds (for robust mode)
5150
max_reconnect_interval: Maximum reconnect interval in seconds (for robust mode)
@@ -55,9 +54,10 @@ def __init__(
5554
on_connection_lost: Callback when connection is lost (for non-robust mode)
5655
"""
5756
self._connection_url = connection_url
58-
# Generate app_id with service name plus 8 random hex characters
57+
# Generate fallback app_id with random hex characters
58+
# Note: For RPC, app_id should be provided via EndpointParams from application level
5959
random_hex = secrets.token_hex(4) # 4 bytes = 8 hex chars
60-
self._app_id = f"{service_name}-{random_hex}"
60+
self._app_id = f"wire-{random_hex}"
6161
self._connection: AbstractConnection | None = None
6262
self._robust = robust
6363
self._reconnect_interval = reconnect_interval
@@ -135,8 +135,12 @@ async def create_consumer(
135135
# Generate operation name from available information
136136
operation_name = self._generate_operation_name(kwargs)
137137

138+
# Use provided app_id if available, otherwise use instance app_id
139+
# This allows application-level control over queue naming
140+
app_id = kwargs.get("app_id", self._app_id)
141+
138142
# Resolve AMQP configuration using pattern matching
139-
config = resolve_amqp_config(kwargs, operation_name, self._app_id)
143+
config = resolve_amqp_config(kwargs, operation_name, app_id)
140144

141145
connection = await self._get_connection()
142146

@@ -154,8 +158,12 @@ async def create_producer(
154158
# Generate operation name from available information
155159
operation_name = self._generate_operation_name(kwargs)
156160

161+
# Use provided app_id if available, otherwise use instance app_id
162+
# This allows application-level control over queue naming
163+
app_id = kwargs.get("app_id", self._app_id)
164+
157165
# Resolve AMQP configuration using pattern matching
158-
config = resolve_amqp_config(kwargs, operation_name, self._app_id)
166+
config = resolve_amqp_config(kwargs, operation_name, app_id)
159167

160168
connection = await self._get_connection()
161169

src/asyncapi_python/contrib/wire/amqp/producer.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
from typing import Any
44

55
try:
6-
from aio_pika import Message as AmqpMessage, ExchangeType # type: ignore[import-not-found]
6+
from aio_pika import ExchangeType
7+
from aio_pika import Message as AmqpMessage # type: ignore[import-not-found]
78
from aio_pika.abc import ( # type: ignore[import-not-found]
8-
AbstractConnection,
99
AbstractChannel,
10+
AbstractConnection,
1011
AbstractExchange,
1112
)
1213
except ImportError as e:
@@ -100,11 +101,38 @@ async def stop(self) -> None:
100101

101102
self._started = False
102103

103-
async def send_batch(self, messages: list[AmqpWireMessage]) -> None:
104-
"""Send a batch of messages using the configured exchange"""
104+
async def send_batch(
105+
self, messages: list[AmqpWireMessage], *, address_override: str | None = None
106+
) -> None:
107+
"""Send a batch of messages using the configured exchange
108+
109+
Args:
110+
messages: Messages to send
111+
address_override: Optional dynamic routing key/queue to override static config.
112+
If provided, overrides self._routing_key for this send operation.
113+
If None, uses static routing_key from configuration/bindings.
114+
"""
105115
if not self._started or not self._channel or not self._target_exchange:
106116
raise RuntimeError("Producer not started")
107117

118+
# Determine effective routing key: override takes precedence over static config
119+
effective_routing_key = (
120+
address_override if address_override is not None else self._routing_key
121+
)
122+
123+
# Validate we have a destination
124+
# Fail ONLY if both are truly missing:
125+
# - address_override is None (not provided by caller)
126+
# - AND self._routing_key is "" (no static config was derived from channel/bindings/operation)
127+
# Note: empty string IS valid when explicitly configured (fanout exchanges, default exchange)
128+
if address_override is None and not self._routing_key:
129+
raise ValueError(
130+
f"Cannot send: no routing destination available. "
131+
f"RPC replies require reply_to from the request, or the channel must "
132+
f"have address/bindings/operation-name to derive destination. "
133+
f"(address_override={address_override}, routing_key={self._routing_key!r})"
134+
)
135+
108136
for message in messages:
109137
amqp_message = AmqpMessage(
110138
body=message.payload,
@@ -113,8 +141,8 @@ async def send_batch(self, messages: list[AmqpWireMessage]) -> None:
113141
reply_to=message.reply_to,
114142
)
115143

116-
# Publish to the configured target exchange (not always default)
144+
# Publish to the configured target exchange with dynamic or static routing key
117145
await self._target_exchange.publish(
118146
amqp_message,
119-
routing_key=self._routing_key,
147+
routing_key=effective_routing_key,
120148
)

src/asyncapi_python/contrib/wire/amqp/resolver.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
"""Binding resolution with comprehensive pattern matching"""
22

33
from typing import Any
4-
from asyncapi_python.kernel.wire import EndpointParams
5-
from asyncapi_python.kernel.document.channel import Channel
4+
65
from asyncapi_python.kernel.document.bindings import AmqpChannelBinding
6+
from asyncapi_python.kernel.document.channel import Channel
7+
from asyncapi_python.kernel.wire import EndpointParams
78

8-
from .config import AmqpConfig, AmqpBindingType
9-
from .utils import validate_parameters_strict, substitute_parameters
9+
from .config import AmqpBindingType, AmqpConfig
10+
from .utils import substitute_parameters, validate_parameters_strict
1011

1112

1213
def resolve_amqp_config(
@@ -57,17 +58,32 @@ def resolve_amqp_config(
5758
},
5859
)
5960

60-
# Reply channel with explicit address - shared channel with filtering
61+
# Reply channel with explicit address - check if direct queue or topic exchange
6162
case (True, _, address, _) if address:
6263
resolved_address = substitute_parameters(address, param_values)
63-
return AmqpConfig(
64-
queue_name=f"reply-{app_id}", # App-specific reply queue
65-
exchange_name=resolved_address, # Shared exchange for replies
66-
exchange_type="topic", # Enable pattern matching for filtering
67-
routing_key=app_id, # Filter messages by app_id
68-
binding_type=AmqpBindingType.REPLY,
69-
queue_properties={"durable": True, "exclusive": False},
70-
)
64+
# If address starts with "reply-", treat it as a direct queue name (RPC pattern)
65+
if resolved_address.startswith("reply-"):
66+
return AmqpConfig(
67+
queue_name=resolved_address, # Use address as queue name
68+
exchange_name="", # Default exchange for direct routing
69+
routing_key=resolved_address, # Route directly to queue
70+
binding_type=AmqpBindingType.REPLY,
71+
queue_properties={
72+
"durable": False,
73+
"exclusive": True,
74+
"auto_delete": True,
75+
},
76+
)
77+
else:
78+
# Topic-based reply pattern - shared exchange with filtering
79+
return AmqpConfig(
80+
queue_name=f"reply-{app_id}", # App-specific reply queue
81+
exchange_name=resolved_address, # Shared exchange for replies
82+
exchange_type="topic", # Enable pattern matching for filtering
83+
routing_key=app_id, # Filter messages by app_id
84+
binding_type=AmqpBindingType.REPLY,
85+
queue_properties={"durable": True, "exclusive": False},
86+
)
7187

7288
# Reply channel with binding - defer to binding resolution
7389
case (True, binding, _, _) if binding and binding.type == "queue":

src/asyncapi_python/contrib/wire/amqp/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# TODO: This thing should be general wire utils, not tied to specific wire
44

55
import re
6+
67
from asyncapi_python.kernel.document.channel import Channel
78

89

src/asyncapi_python/contrib/wire/in_memory.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
from collections import defaultdict, deque
55
from dataclasses import dataclass, field
66
from typing import Any, AsyncGenerator
7+
78
from typing_extensions import Unpack
89

910
from asyncapi_python.kernel.wire import AbstractWireFactory, EndpointParams
10-
from asyncapi_python.kernel.wire.typing import Producer, Consumer
11+
from asyncapi_python.kernel.wire.typing import Consumer, Producer
1112

1213

1314
@dataclass
@@ -142,13 +143,34 @@ async def stop(self) -> None:
142143
"""Stop the producer"""
143144
self._started = False
144145

145-
async def send_batch(self, messages: list[InMemoryMessage]) -> None:
146-
"""Send a batch of messages to the channel"""
146+
async def send_batch(
147+
self, messages: list[InMemoryMessage], *, address_override: str | None = None
148+
) -> None:
149+
"""Send a batch of messages to the channel
150+
151+
Args:
152+
messages: Messages to send
153+
address_override: Optional dynamic channel name to override static config.
154+
If provided, overrides self._channel_name for this send operation.
155+
If None, uses static channel_name from configuration.
156+
"""
147157
if not self._started:
148158
raise RuntimeError("Producer not started")
149159

160+
# Determine effective channel: override takes precedence over static config
161+
effective_channel = (
162+
address_override if address_override is not None else self._channel_name
163+
)
164+
165+
# Validate we have a destination
166+
if not effective_channel:
167+
raise ValueError(
168+
f"Cannot send: no channel specified. "
169+
f"address_override={address_override}, channel_name={self._channel_name}"
170+
)
171+
150172
for message in messages:
151-
await _bus.publish(self._channel_name, message)
173+
await _bus.publish(effective_channel, message)
152174

153175

154176
class InMemoryConsumer(Consumer[InMemoryIncomingMessage]):

src/asyncapi_python/kernel/application.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import asyncio
2-
from typing import TypedDict, Any
3-
from typing_extensions import Unpack, Required, NotRequired
2+
from typing import Any, TypedDict
3+
4+
from typing_extensions import NotRequired, Required, Unpack
45

56
from asyncapi_python.kernel.document.operation import Operation
67
from asyncapi_python.kernel.wire import AbstractWireFactory
8+
9+
from .codec import CodecFactory
710
from .endpoint import AbstractEndpoint, EndpointFactory
811
from .endpoint.abc import EndpointParams
9-
from .codec import CodecFactory
1012

1113

1214
class BaseApplication:

0 commit comments

Comments
 (0)