Skip to content

Commit

Permalink
Add singleTopic in CommittableOffsetBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
geirolz committed Jul 29, 2022
1 parent 7b8f807 commit 3601e49
Showing 1 changed file with 33 additions and 2 deletions.
35 changes: 33 additions & 2 deletions modules/core/src/main/scala/fs2/kafka/CommittableOffsetBatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ sealed abstract class CommittableOffsetBatch[F[_]] {
}

object CommittableOffsetBatch {

private[kafka] def apply[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
Expand Down Expand Up @@ -140,7 +141,7 @@ object CommittableOffsetBatch {

override def commit: F[Unit] =
if (_consumerGroupIdsMissing)
F.raiseError(ConsumerGroupException(consumerGroupIds))
ApplicativeThrow[F].raiseError(ConsumerGroupException(consumerGroupIds))
else {
offsetsByTopic
.map {
Expand All @@ -149,7 +150,7 @@ object CommittableOffsetBatch {
.getOrElse[Map[TopicPartition, OffsetAndMetadata] => F[Unit]](
topicName,
_ =>
F.raiseError(
ApplicativeThrow[F].raiseError(
new RuntimeException(s"Cannot perform commit for topic: $topicName")
)
)
Expand All @@ -164,6 +165,36 @@ object CommittableOffsetBatch {
}
}

@deprecated("Use CommittableOffsetBatch.apply with commitMap instead.", since = "2.5.1")
private[kafka] def apply[F[_]](
offsets: Map[TopicPartition, OffsetAndMetadata],
consumerGroupIds: Set[String],
consumerGroupIdsMissing: Boolean,
commit: Map[TopicPartition, OffsetAndMetadata] => F[Unit]
)(implicit F: ApplicativeError[F, Throwable]): CommittableOffsetBatch[F] =
apply[F](
offsets,
consumerGroupIds,
consumerGroupIdsMissing,
offsets.headOption
.map(_._1.topic())
.map(topicName => Map(topicName -> commit))
.getOrElse(Map.empty)
)

/**
* A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
*
* @param topicPartition Topic partition information
* @param offsetAndMetadata Offset and metadata
* @param consumerGroupId Consumer group id
* @param commit effect to perform to commit this offset
* @tparam F effect type to use to perform the commit effect
* @return A [[CommittableOffsetBatch]] which does include only one offset for a single topic.
*
* @see [[CommittableOffsetBatch#fromFoldable]]
* @see [[CommittableOffsetBatch#fromFoldableOption]]
*/
def one[F[_]: ApplicativeThrow](
topicPartition: TopicPartition,
offsetAndMetadata: OffsetAndMetadata,
Expand Down

0 comments on commit 3601e49

Please sign in to comment.