Skip to content

Commit 6042723

Browse files
authored
Implement channel parameter handling (#30)
* Increment version * Add basic doc validation system * Update lock * Add extraction logic * Disable channel id parametrization * Warn of channel address redundancy * Add tests * Add field extraction logic for codecs * Integrate parameters into rpc and normal publishers * Refactor publish/rpc_client to share code * Split endpoint/abc * Add param validation for channels * Add topic example * Finalize topic example * Format files * Format tests * Fix tests
1 parent d512f40 commit 6042723

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+4261
-171
lines changed

examples/amqp-topic/.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Generated code directories
2+
publisher/
3+
subscriber1/
4+
subscriber2/
5+
6+
# Virtual environment
7+
.venv/

examples/amqp-topic/Makefile

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
.PHONY: venv install generate publisher subscriber1 subscriber2 clean help
2+
3+
# Virtual environment
4+
VENV_NAME := .venv
5+
PYTHON := $(VENV_NAME)/bin/python
6+
PIP := $(VENV_NAME)/bin/pip
7+
CODEGEN := $(VENV_NAME)/bin/asyncapi-python-codegen
8+
9+
help:
10+
@echo "Available targets:"
11+
@echo " make venv - Create virtual environment"
12+
@echo " make install - Install dependencies"
13+
@echo " make generate - Generate code from AsyncAPI specs"
14+
@echo " make publisher - Run the publisher"
15+
@echo " make subscriber1 - Run subscriber 1"
16+
@echo " make subscriber2 - Run subscriber 2"
17+
@echo " make clean - Remove virtual environment and generated code"
18+
19+
venv:
20+
@echo "Creating virtual environment..."
21+
python3 -m venv $(VENV_NAME)
22+
@echo "✅ Virtual environment created"
23+
24+
install: venv
25+
@echo "Installing dependencies..."
26+
$(PIP) install -e ../../[amqp,codegen]
27+
@echo "✅ Dependencies installed"
28+
29+
generate: install
30+
@echo "Generating publisher code..."
31+
$(CODEGEN) spec/publisher.asyncapi.yaml publisher --force
32+
@echo "✅ Publisher code generated"
33+
@echo ""
34+
@echo "Generating subscriber1 code..."
35+
$(CODEGEN) spec/subscriber1.asyncapi.yaml subscriber1 --force
36+
@echo "✅ Subscriber1 code generated"
37+
@echo ""
38+
@echo "Generating subscriber2 code..."
39+
$(CODEGEN) spec/subscriber2.asyncapi.yaml subscriber2 --force
40+
@echo "✅ Subscriber2 code generated"
41+
42+
publisher: generate
43+
@echo "Starting publisher..."
44+
@echo ""
45+
$(PYTHON) main-publisher.py
46+
47+
subscriber1: generate
48+
@echo "Starting subscriber 1..."
49+
@echo ""
50+
$(PYTHON) main-subscriber1.py
51+
52+
subscriber2: generate
53+
@echo "Starting subscriber 2..."
54+
@echo ""
55+
$(PYTHON) main-subscriber2.py
56+
57+
clean:
58+
@echo "Cleaning up..."
59+
rm -rf $(VENV_NAME)
60+
rm -rf publisher/
61+
rm -rf subscriber1/
62+
rm -rf subscriber2/
63+
rm -rf __pycache__
64+
@echo "✅ Cleanup complete"

examples/amqp-topic/README.md

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
# AMQP Topic Exchange Example
2+
3+
Demonstrates **parameterized channels with wildcard subscriptions** using AMQP topic exchanges.
4+
5+
## Overview
6+
7+
Weather alert system showing:
8+
- Publishers send to specific routing keys (concrete parameters)
9+
- Subscribers use wildcards (`*` and `#`) for pattern matching
10+
- Topic exchange routes messages based on routing key patterns
11+
12+
## Architecture
13+
14+
```
15+
Topic Exchange: weather_alerts
16+
Channel: weather.{location}.{severity}
17+
18+
Routing Keys:
19+
weather.NYC.high
20+
weather.LA.low
21+
weather.CHI.critical
22+
```
23+
24+
## Project Structure
25+
26+
```
27+
examples/amqp-topic/
28+
├── spec/
29+
│ ├── common.asyncapi.yaml # Shared channel/message definitions
30+
│ ├── publisher.asyncapi.yaml # Publisher app spec
31+
│ ├── subscriber1.asyncapi.yaml # Subscriber 1 spec
32+
│ └── subscriber2.asyncapi.yaml # Subscriber 2 spec
33+
├── main-publisher.py # Publisher implementation
34+
├── main-subscriber1.py # Subscriber 1 implementation
35+
├── main-subscriber2.py # Subscriber 2 implementation
36+
├── Makefile # Build and run commands
37+
└── README.md
38+
```
39+
40+
## Usage
41+
42+
### 1. Generate Code
43+
44+
```bash
45+
make generate
46+
```
47+
48+
This generates type-safe Python code from AsyncAPI specs:
49+
- `publisher/` - from `spec/publisher.asyncapi.yaml`
50+
- `subscriber1/` - from `spec/subscriber1.asyncapi.yaml`
51+
- `subscriber2/` - from `spec/subscriber2.asyncapi.yaml`
52+
53+
### 2. Run Publisher
54+
55+
```bash
56+
make publisher
57+
```
58+
59+
Publishes weather alerts to the topic exchange.
60+
61+
### 3. Run Subscribers
62+
63+
Terminal 1:
64+
```bash
65+
make subscriber1
66+
```
67+
68+
Terminal 2:
69+
```bash
70+
make subscriber2
71+
```
72+
73+
## Key Features
74+
75+
### Parameterized Channels
76+
77+
Channel address: `weather.{location}.{severity}`
78+
79+
Parameters are extracted from message payload:
80+
```python
81+
WeatherAlert(
82+
location="NYC", # → {location}
83+
severity="high", # → {severity}
84+
...
85+
)
86+
# Creates routing key: weather.NYC.high
87+
```
88+
89+
### Wildcard Subscriptions
90+
91+
Subscribers can use AMQP wildcards for pattern matching:
92+
- `*` - Matches exactly one word
93+
- `#` - Matches zero or more words
94+
95+
**This Example**:
96+
- **Subscriber 1**: `weather.NYC.*` - All NYC alerts (any severity)
97+
- Uses `parameters={"location": "NYC"}`
98+
- Receives: NYC-HIGH
99+
- **Subscriber 2**: `weather.*.critical` - Critical alerts (any location)
100+
- Uses `parameters={"severity": "critical"}`
101+
- Receives: CHI-CRITICAL
102+
103+
**Other Possible Patterns**:
104+
- `weather.LA.*` - All LA alerts
105+
- `weather.*.high` - High severity alerts from any location
106+
- `weather.*.*` - ALL weather alerts (empty parameters)
107+
108+
### Parameter Validation
109+
110+
The runtime enforces:
111+
- ✅ All required parameters must be provided
112+
- ✅ Exact match required (strict validation)
113+
- ✅ Queue bindings reject wildcards (concrete values only)
114+
- ✅ Routing key bindings accept wildcards (pattern matching)
115+
116+
## Development
117+
118+
### Clean Up
119+
120+
```bash
121+
make clean
122+
```
123+
124+
Removes virtual environment and generated code.
125+
126+
### Help
127+
128+
```bash
129+
make help
130+
```
131+
132+
Shows available Makefile targets.
133+
134+
## Learn More
135+
136+
- [AsyncAPI Specification](https://www.asyncapi.com/)
137+
- [AMQP Topic Exchanges](https://www.rabbitmq.com/tutorials/tutorial-five-python.html)
138+
- [AsyncAPI Python Documentation](https://github.com/yourorg/asyncapi-python)
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Weather Alert Publisher
4+
5+
Publishes weather alerts to an AMQP topic exchange with dynamic routing keys.
6+
The routing key is built from the message payload fields (location and severity).
7+
8+
Example usage:
9+
python main-publisher.py
10+
"""
11+
12+
import asyncio
13+
from datetime import datetime, timezone
14+
from os import environ
15+
16+
from asyncapi_python.contrib.wire.amqp import AmqpWire
17+
from publisher import Application
18+
from publisher.messages.json import WeatherAlert, Severity
19+
20+
# AMQP connection URI (can be overridden via environment variable)
21+
AMQP_URI = environ.get("AMQP_URI", "amqp://guest:guest@localhost")
22+
23+
# Initialize application with AMQP wire
24+
app = Application(AmqpWire(AMQP_URI))
25+
26+
27+
async def main() -> None:
28+
"""Main publisher routine"""
29+
print("🌤️ Weather Alert Publisher")
30+
print("=" * 50)
31+
print(f"Connecting to: {AMQP_URI}")
32+
33+
# Start the application
34+
await app.start()
35+
print("✅ Connected to AMQP broker")
36+
print()
37+
38+
# Sample weather alerts to publish
39+
alerts = [
40+
WeatherAlert(
41+
location="NYC",
42+
severity=Severity.HIGH,
43+
temperature=95,
44+
description="Heat wave warning in effect. Stay hydrated!",
45+
timestamp=datetime.now(timezone.utc),
46+
),
47+
WeatherAlert(
48+
location="LA",
49+
severity=Severity.LOW,
50+
temperature=72,
51+
description="Sunny and pleasant weather expected.",
52+
timestamp=datetime.now(timezone.utc),
53+
),
54+
WeatherAlert(
55+
location="CHI",
56+
severity=Severity.CRITICAL,
57+
temperature=5,
58+
description="Severe winter storm approaching. Travel not recommended.",
59+
timestamp=datetime.now(timezone.utc),
60+
),
61+
WeatherAlert(
62+
location="MIA",
63+
severity=Severity.MEDIUM,
64+
temperature=88,
65+
description="Scattered thunderstorms expected this afternoon.",
66+
timestamp=datetime.now(timezone.utc),
67+
),
68+
WeatherAlert(
69+
location="SEA",
70+
severity=Severity.LOW,
71+
temperature=65,
72+
description="Light rain throughout the day.",
73+
timestamp=datetime.now(timezone.utc),
74+
),
75+
]
76+
77+
# Publish each alert
78+
print("📡 Publishing weather alerts...")
79+
print()
80+
81+
for alert in alerts:
82+
# The routing key will be dynamically built as: weather.{location}.{severity}
83+
# For example: weather.NYC.high, weather.LA.low, etc.
84+
await app.producer.publish_weather_alert(alert)
85+
86+
print(f"✉️ Published alert:")
87+
print(f" Routing Key: weather.{alert.location}.{alert.severity.value}")
88+
print(f" Location: {alert.location}")
89+
print(f" Severity: {alert.severity.value}")
90+
print(f" Temperature: {alert.temperature}°F")
91+
print(f" Description: {alert.description}")
92+
print()
93+
94+
# Small delay between messages for visibility
95+
await asyncio.sleep(0.5)
96+
97+
print(f"✅ Published {len(alerts)} weather alerts")
98+
print()
99+
100+
# Stop the application
101+
await app.stop()
102+
print("👋 Disconnected from AMQP broker")
103+
104+
105+
if __name__ == "__main__":
106+
asyncio.run(main())

0 commit comments

Comments
 (0)