kaflow
is a simple framework that allows you to build Kafka streams processing aplications in Python with ease.
Some of the features offered by kaflow
:
- Dependency Injection system inspired by FastAPI and xpresso, and backed by di.
- Automatic deserialization of incoming messages and serialization of outgoing messages. Supports popular formats like
JSON
,Avro
orProtobuf
. - Message validation thanks to pydantic.
Python 3.8+
pip install kaflow
from kaflow import (
FromHeader,
FromKey,
FromValue,
Json,
Kaflow,
Message,
MessageOffset,
MessagePartition,
MessageTimestamp,
String,
)
from pydantic import BaseModel
class UserClick(BaseModel):
user_id: int
url: str
timestamp: int
class Key(BaseModel):
environment: str
app = Kaflow(name="AwesomeKakfaApp", brokers="localhost:9092")
@app.consume(topic="user_clicks", sink_topics=["user_clicks_json"])
async def consume_user_clicks(
message: FromValue[Json[UserClick]],
key: FromKey[Json[Key]],
x_correlation_id: FromHeader[String[str]],
x_request_id: FromHeader[String[str]],
partition: MessagePartition,
offset: MessageOffset,
timestamp: MessageTimestamp,
) -> Message:
# Do something with the message
...
# Publish to another topic
return Message(value=b'{"user_clicked": "true"}')
app.run()