Releases: gabrielmbmb/kaflow
Releases · gabrielmbmb/kaflow
0.2.2
What's Changed
- 🔥 Remove extra
io.BytesIO
by @gabrielmbmb in #32 - 🔖 Release version
0.2.2
by @gabrielmbmb in #33
Full Changelog: 0.2.1...0.2.2
0.2.1
What's Changed
- ✨ Add
seek_offset
param by @gabrielmbmb in #30 - 🔖 Release version
0.2.1
by @gabrielmbmb in #31
Full Changelog: 0.2.0...0.2.1
0.2.0
✨ Features
- Add a
TestClient
that allows writing unit tests for consumer functions
import pytest
from kaflow import FromValue, Kaflow, Message
from kaflow.testclient import TestClient
app = Kaflow(client_id="test", brokers=["localhost:9092"])
class RandomDependency:
def __init__(self, random: str = "random") -> None:
self.random = random
@app.consume(topic="test_topic", sink_topics=["test_topic_2"])
async def consume_test_topic(
value: FromValue[bytes], dependency: RandomDependency
) -> Message:
print("Value", value)
print("Override dependency:", dependency.random)
return Message(value=b"Hello Unit Test!")
@pytest.fixture
def test_client() -> TestClient:
client = TestClient(app=app)
with app.dependency_overrides:
app.dependency_overrides[RandomDependency] = lambda: RandomDependency(
random="not so random"
)
yield client
def test_consume_test_topic(test_client: TestClient) -> None:
message = test_client.publish(
topic="test_topic",
value=b"Hello World!",
)
assert message.value == b"Hello Unit Test!"
0.1.4
🐛 Fixes
ssl_context
was not being built ifsecurity_protocol=="SASL_SSL"
0.1.3
✨ Features
- Add all possible values for
security_protocol
- Serialize and deserialize Avro messages including schema
0.1.2
🐛 Fixes
- Fix uncaught child exceptions
- Fix
security_protocol
allowed values
🔥 Removed code
- Remove
"Programming Language :: Python :: 3.7"
classifier
0.1.1
Minor release to remove a deprecated argument in aiokafka
.
🔥 Removed code
- Remove passing
loop
argument toaiokafka.{AIOKafkaConsumer, AIOKafkaProducer}
0.1.0
✨ Features
- Better data extraction from message:
- Extract data from the value of the message using
FromValue[...]
- Extract data from the key of the message using
FromKey[...]
- Extract data from the headers of the message using
FromHeader[...]
from pydantic import BaseModel from kaflow import FromHeader, FromKey, FromValue, Json, Kaflow class Stock(BaseModel): name: str price: float class Key(BaseModel): key: str class Header1(BaseModel): header: str class Header2(BaseModel): header: str app = Kaflow(client_id="kaflow", brokers="localhost:9092") @app.consume(topic="stock") async def consume_stock( value: FromValue[Json[Stock]], key: FromKey[Json[Key]], header_1: FromHeader[Json[Header1]], header_2: FromHeader[Json[Header2]], ) -> None: print("value", value) print("key", key) print("header 1", header_1) print("header 2", header_2) app.run()
- Extract data from the value of the message using
- Add
String
serializer - Add
aiokafka.{AIOKafkaProducer, AIOKafkaConsumer}
parameters toKaflow.__init__
method
0.0.2
The first release of Kaflow
! (I published 0.0.1
by error in PyPI 😅)