Skip to content

0.4.0

Latest
Compare
Choose a tag to compare
@nomisRev nomisRev released this 08 Apr 05:24
· 19 commits to main since this release
ccafffa

0.4.0

This release is sees a big new feature Publishing and a lot of minor improvements and fixes towards 1.0 in the receiver.

PublisherScope

A PublishScope, that can offer message (doesn't await ack), or publish which is offer + ack.
The block however waits all the offer inside, similar to coroutineScope and re-throws any failed offer.

With transaction block, that wraps the block in the correct transaction semantics and has same behavior of await offer. A transaction blocks cannot be nested, thanks @PublisherDSL.

publisher.publishScope {
  offer((1..10).map {
    ProducerRecord(topic.name(), "$it", "msg-$it")
  })
  publish((11..20).map {
    ProducerRecord(topic.name(), "$it", "msg-$it")
  })
  transaction {
    // transaction { } illegal to be called here DslMarker magic
    offer((21..30).map {
      ProducerRecord(topic.name(), "$it", "msg-$it")
    })
    publish((31..40).map {
      ProducerRecord(topic.name(), "$it", "msg-$it")
    })
  }// Waits until all offer finished in transaction, fails if any failed

  // looping
  (0..100).forEach {
    delay(100.milliseconds)
    val record = ProducerRecord(topic.name(), "$it", "msg-$it")
    offer(record)
  }
  
  // streaming
  flow(1..100)
    .onEach { delay(100.milliseconds) }
    .map { ProducerRecord(topic.name(), "$it", "msg-$it") }
    .collect { offer(it) }
}

See KafkaPublisherSpec for more examples.

Flow publish records

Often we need to receiver/consume events from Kafka, and as a result we need to publish new events to Kafka. That typically requires a streaming solution to produce records into Kafka, and keeping track of all published records into Kafka and their lifecycle and wiring that back into a stream is tricky. So we kotlin-kafka now offers Flow.produce build in the same style as PublisherScope!

produce will send message to Kafka, and stream Result of RecordMetadata back to the user.
It will not stop sending messages if any error occurs, you can throw it in the collector if you want the stream to stop. Otherwise, use produceOrThrow.

Any encountered errors will be sent to the collector as [Result.failure], and they will also be rethrown if the Flow completes without handling them (e.g. using Flow.catch). Check Kafka's [Callback] documentation for more information on when and which errors are thrown, and which are recoverable.

suspend fun publish messages(bootStrapServers: String, topic: String) {
 val publisherSettings = PublisherSettings(
   bootstrapServers = bootStrapServers,
   keySerializer = StringSerializer(),
   valueSerializer = StringSerializer()
 )
 (0..10_000).asFlow()
   .onEach { delay(10.milliseconds) }
   .map { index ->
     ProducerRecord<String, String>(topic.name(), index % 4, "$index", "Message $index")
   }.produce(publisherSettings)
   .collect { metadata: Result<RecordMetadata> ->
     metadata
      .onSuccess { println("partition: ${it.partition()}, offset: ${it.offset}") }
      .onFailure { println("Failed to send: $it") }
   }

All, and any feedback welcome! ☺️

What's Changed

Full Changelog: 0.3.1...0.4.0