Skip to content

Commit c62e8c7

Browse files
authored
Merge pull request #688 from CleverCloud/push-urnuyrzvnowk
Rebased arendsyl’s PR to fix actions
2 parents 80578df + a3f7f8a commit c62e8c7

File tree

7 files changed

+93
-33
lines changed

7 files changed

+93
-33
lines changed

pulsar4s-cats-effect/src/main/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandler.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ trait CatsAsyncHandlerLowPriority {
2525

2626
object CompletableFutureConverters extends Logging {
2727

28-
implicit class CompletableOps[F[_]: Async, T](f: => F[CompletableFuture[T]]) {
28+
implicit class CompletableOps[F[_] : Async, T](f: => F[CompletableFuture[T]]) {
2929
def liftF: F[T] = {
3030
f.flatMap { f =>
3131
Async[F].defer {
@@ -62,7 +62,7 @@ trait CatsAsyncHandlerLowPriority {
6262
}
6363
}
6464

65-
implicit def asyncHandlerForCatsEffectAsync[F[_]: Async]: AsyncHandler[F] = new AsyncHandler[F] with Logging {
65+
implicit def asyncHandlerForCatsEffectAsync[F[_] : Async]: AsyncHandler[F] = new AsyncHandler[F] with Logging {
6666

6767
import CompletableFutureConverters._
6868

@@ -114,6 +114,10 @@ trait CatsAsyncHandlerLowPriority {
114114
consumer.seekAsync(messageId)
115115
}.liftF.void
116116

117+
override def seekAsync(consumer: JConsumer[_], timestamp: Long): F[Unit] = Async[F].delay {
118+
consumer.seekAsync(timestamp)
119+
}.liftF.void
120+
117121
override def seekAsync(reader: api.Reader[_], messageId: MessageId): F[Unit] = Async[F].delay {
118122
reader.seekAsync(messageId)
119123
}.liftF.void
@@ -139,7 +143,8 @@ trait CatsAsyncHandlerLowPriority {
139143
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
140144
Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId)).liftF.void
141145

142-
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): F[Unit] =
146+
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId,
147+
txn: Transaction): F[Unit] =
143148
Async[F].delay(consumer.acknowledgeCumulativeAsync(messageId, txn)).liftF.void
144149

145150
override def negativeAcknowledgeAsync[T](consumer: api.Consumer[T], messageId: MessageId): F[Unit] =
@@ -173,9 +178,9 @@ trait CatsAsyncHandlerLowPriority {
173178
Async[F].delay(builder.sendAsync()).liftF.map(MessageId.fromJava)
174179

175180
override def withTransaction[E, A](
176-
builder: api.transaction.TransactionBuilder,
177-
action: TransactionContext => F[Either[E, A]]
178-
): F[Either[E, A]] = {
181+
builder: api.transaction.TransactionBuilder,
182+
action: TransactionContext => F[Either[E, A]]
183+
): F[Either[E, A]] = {
179184
Resource.makeCase(startTransaction(builder)) { (txn, exitCase) =>
180185
if (exitCase == ExitCase.Succeeded) Async[F].unit else txn.abort
181186
}.use { txn =>
@@ -187,7 +192,9 @@ trait CatsAsyncHandlerLowPriority {
187192

188193
def startTransaction(builder: api.transaction.TransactionBuilder): F[TransactionContext] =
189194
Async[F].delay(builder.build()).liftF.map(TransactionContext(_))
195+
190196
def commitTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.commit()).liftF.map(_ => ())
197+
191198
def abortTransaction(txn: Transaction): F[Unit] = Async[F].delay(txn.abort()).liftF.map(_ => ())
192199

193200
}

pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/AsyncHandler.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ trait AsyncHandler[F[_]] {
3131
def flush(producer: api.Producer[_]): F[Unit]
3232

3333
def seekAsync(consumer: api.Consumer[_], messageId: MessageId): F[Unit]
34+
def seekAsync(consumer: api.Consumer[_], timestamp: Long): F[Unit]
3435

3536
def seekAsync(reader: api.Reader[_], messageId: MessageId): F[Unit]
3637
def seekAsync(reader: api.Reader[_], timestamp: Long): F[Unit]

pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/Consumer.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.apache.pulsar.client.api.ConsumerStats
55
import org.apache.pulsar.client.api.transaction.Transaction
66

77
import java.io.Closeable
8+
import java.time.Instant
89
import java.util.concurrent.TimeUnit
910
import scala.concurrent.duration.FiniteDuration
1011
import scala.util.Try
@@ -60,9 +61,17 @@ trait Consumer[T] extends Closeable with TransactionalConsumerOps[T] {
6061
def redeliverUnacknowledgedMessages(): Unit
6162

6263
def seek(messageId: MessageId): Unit
64+
def seek(timestamp: Long): Unit
65+
def seek(timestamp: Instant): Unit =
66+
seek(timestamp.toEpochMilli)
67+
6368
def seekEarliest(): Unit = seek(MessageId.earliest)
6469
def seekLatest(): Unit = seek(MessageId.latest)
70+
6571
def seekAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit]
72+
def seekAsync[F[_] : AsyncHandler](timestamp: Long): F[Unit]
73+
def seekAsync[F[_] : AsyncHandler](timestamp: Instant): F[Unit] =
74+
seekAsync[F](timestamp.toEpochMilli)
6675

6776
def getLastMessageId(): MessageId
6877

@@ -137,9 +146,14 @@ class DefaultConsumer[T](consumer: JConsumer[T]) extends Consumer[T] with Loggin
137146

138147
override def seek(messageId: MessageId): Unit = consumer.seek(messageId)
139148

149+
override def seek(timestamp: Long): Unit = consumer.seek(timestamp)
150+
140151
override def seekAsync[F[_] : AsyncHandler](messageId: MessageId): F[Unit] =
141152
implicitly[AsyncHandler[F]].seekAsync(consumer, messageId)
142-
153+
154+
override def seekAsync[F[_] : AsyncHandler](timestamp: Long): F[Unit] =
155+
implicitly[AsyncHandler[F]].seekAsync(consumer, timestamp)
156+
143157
override def getLastMessageId(): MessageId = consumer.getLastMessageId()
144158

145159
override def getLastMessageIdAsync[F[_] : AsyncHandler]: F[MessageId] = implicitly[AsyncHandler[F]].getLastMessageId(consumer)

pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/FutureAsyncHandler.scala

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,20 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
4545
}
4646

4747
override def close(producer: api.Producer[_]): Future[Unit] = producer.closeAsync().toScala
48+
4849
override def close(consumer: api.Consumer[_]): Future[Unit] = consumer.closeAsync().toScala
50+
4951
override def close(client: api.PulsarClient): Future[Unit] = client.closeAsync().toScala
5052

5153
override def seekAsync(consumer: api.Consumer[_], messageId: MessageId): Future[Unit] =
5254
consumer.seekAsync(messageId).toScala
53-
55+
56+
override def seekAsync(consumer: api.Consumer[_], timestamp: Long): Future[Unit] =
57+
consumer.seekAsync(timestamp).toScala
58+
5459
override def seekAsync(reader: api.Reader[_], messageId: MessageId): Future[Unit] =
5560
reader.seekAsync(messageId).toScala
56-
61+
5762
override def seekAsync(reader: api.Reader[_], timestamp: Long): Future[Unit] =
5863
reader.seekAsync(timestamp).toScala
5964

@@ -73,13 +78,15 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
7378
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Future[Unit] =
7479
consumer.acknowledgeCumulativeAsync(messageId).toScala
7580

76-
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Future[Unit] =
81+
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId,
82+
txn: Transaction): Future[Unit] =
7783
consumer.acknowledgeCumulativeAsync(messageId, txn).toScala
7884

7985
override def negativeAcknowledgeAsync[T](consumer: JConsumer[T], messageId: MessageId): Future[Unit] =
8086
Future.successful(consumer.negativeAcknowledge(messageId))
8187

8288
override def close(reader: api.Reader[_]): Future[Unit] = reader.closeAsync().toScala
89+
8390
override def flush(producer: api.Producer[_]): Future[Unit] = producer.flushAsync().toScala
8491

8592
override def nextAsync[T](reader: api.Reader[T]): Future[ConsumerMessage[T]] =
@@ -92,9 +99,9 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
9299
builder.sendAsync().toScala.map(MessageId.fromJava)
93100

94101
override def withTransaction[E, A](
95-
builder: api.transaction.TransactionBuilder,
96-
action: TransactionContext => Future[Either[E, A]]
97-
): Future[Either[E, A]] = {
102+
builder: api.transaction.TransactionBuilder,
103+
action: TransactionContext => Future[Either[E, A]]
104+
): Future[Either[E, A]] = {
98105
startTransaction(builder).flatMap { txn =>
99106
Future.unit.flatMap(_ => action(txn)).transformWith {
100107
case Success(Right(value)) =>
@@ -107,7 +114,9 @@ class FutureAsyncHandler(implicit ec: ExecutionContext) extends AsyncHandler[Fut
107114

108115
override def startTransaction(builder: api.transaction.TransactionBuilder): Future[TransactionContext] =
109116
builder.build().toScala.map(TransactionContext(_))
117+
110118
override def commitTransaction(txn: Transaction): Future[Unit] = txn.commit().toScala.map(_ => ())
119+
111120
override def abortTransaction(txn: Transaction): Future[Unit] = txn.abort().toScala.map(_ => ())
112-
121+
113122
}

pulsar4s-monix/src/main/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandler.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,18 @@ class MonixAsyncHandler extends AsyncHandler[Task] {
5757
def unsubscribeAsync(consumer: api.Consumer[_]): Task[Unit] = consumer.unsubscribeAsync()
5858

5959
override def close(producer: api.Producer[_]): Task[Unit] = producer.closeAsync()
60+
6061
override def close(consumer: api.Consumer[_]): Task[Unit] = consumer.closeAsync()
6162

6263
override def seekAsync(consumer: api.Consumer[_], messageId: MessageId): Task[Unit] =
6364
consumer.seekAsync(messageId)
64-
65+
66+
override def seekAsync(consumer: api.Consumer[_], timestamp: Long): Task[Unit] =
67+
consumer.seekAsync(timestamp)
68+
6569
override def seekAsync(reader: api.Reader[_], messageId: MessageId): Task[Unit] =
6670
reader.seekAsync(messageId)
67-
71+
6872
override def seekAsync(reader: api.Reader[_], timestamp: Long): Task[Unit] =
6973
reader.seekAsync(timestamp)
7074

@@ -86,13 +90,17 @@ class MonixAsyncHandler extends AsyncHandler[Task] {
8690
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] =
8791
consumer.acknowledgeCumulativeAsync(messageId)
8892

89-
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] =
93+
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId,
94+
txn: Transaction): Task[Unit] =
9095
consumer.acknowledgeCumulativeAsync(messageId, txn)
9196

9297
override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] =
93-
Task { consumer.negativeAcknowledge(messageId) }
98+
Task {
99+
consumer.negativeAcknowledge(messageId)
100+
}
94101

95102
override def close(reader: Reader[_]): Task[Unit] = reader.closeAsync()
103+
96104
override def close(client: PulsarClient): Task[Unit] = client.closeAsync()
97105

98106
override def flush(producer: api.Producer[_]): Task[Unit] = producer.flushAsync()
@@ -107,9 +115,9 @@ class MonixAsyncHandler extends AsyncHandler[Task] {
107115
Task.deferFuture(builder.sendAsync()).map(MessageId.fromJava)
108116

109117
override def withTransaction[E, A](
110-
builder: api.transaction.TransactionBuilder,
111-
action: TransactionContext => Task[Either[E, A]]
112-
): Task[Either[E, A]] = {
118+
builder: api.transaction.TransactionBuilder,
119+
action: TransactionContext => Task[Either[E, A]]
120+
): Task[Either[E, A]] = {
113121
startTransaction(builder).bracketCase { txn =>
114122
action(txn).flatMap { result =>
115123
(if (result.isRight) txn.commit(this) else txn.abort(this)).map(_ => result)
@@ -119,8 +127,10 @@ class MonixAsyncHandler extends AsyncHandler[Task] {
119127

120128
override def startTransaction(builder: api.transaction.TransactionBuilder): Task[TransactionContext] =
121129
Task.deferFuture(builder.build()).map(TransactionContext(_))
130+
122131
override def commitTransaction(txn: Transaction): Task[Unit] =
123132
Task.deferFuture(txn.commit()).map(_ => ())
133+
124134
override def abortTransaction(txn: Transaction): Task[Unit] =
125135
Task.deferFuture(txn.abort()).map(_ => ())
126136
}

pulsar4s-scalaz/src/main/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandler.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,13 @@ class ScalazAsyncHandler extends AsyncHandler[Task] {
5454

5555
override def seekAsync(consumer: api.Consumer[_], messageId: MessageId): Task[Unit] =
5656
consumer.seekAsync(messageId)
57-
57+
58+
override def seekAsync(consumer: api.Consumer[_], timestamp: Long): Task[Unit] =
59+
consumer.seekAsync(timestamp)
60+
5861
override def seekAsync(reader: api.Reader[_], messageId: MessageId): Task[Unit] =
5962
reader.seekAsync(messageId)
60-
63+
6164
override def seekAsync(reader: api.Reader[_], timestamp: Long): Task[Unit] =
6265
reader.seekAsync(timestamp)
6366

@@ -77,15 +80,21 @@ class ScalazAsyncHandler extends AsyncHandler[Task] {
7780
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] =
7881
consumer.acknowledgeCumulativeAsync(messageId)
7982

80-
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] =
83+
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId,
84+
txn: Transaction): Task[Unit] =
8185
consumer.acknowledgeCumulativeAsync(messageId, txn)
8286

8387
override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] =
84-
Task { consumer.negativeAcknowledge(messageId) }
88+
Task {
89+
consumer.negativeAcknowledge(messageId)
90+
}
8591

8692
override def close(reader: Reader[_]): Task[Unit] = reader.closeAsync()
93+
8794
override def close(producer: api.Producer[_]): Task[Unit] = producer.closeAsync()
95+
8896
override def close(consumer: api.Consumer[_]): Task[Unit] = consumer.closeAsync()
97+
8998
override def close(client: PulsarClient): Task[Unit] = client.closeAsync()
9099

91100
override def flush(producer: api.Producer[_]): Task[Unit] = producer.flushAsync()
@@ -100,11 +109,12 @@ class ScalazAsyncHandler extends AsyncHandler[Task] {
100109
builder.sendAsync().map(MessageId.fromJava)
101110

102111
override def withTransaction[E, A](
103-
builder: api.transaction.TransactionBuilder,
104-
action: TransactionContext => Task[Either[E, A]]
105-
): Task[Either[E, A]] = {
112+
builder: api.transaction.TransactionBuilder,
113+
action: TransactionContext => Task[Either[E, A]]
114+
): Task[Either[E, A]] = {
106115
def close[T](txn: TransactionContext, commit: Boolean, result: T): Task[T] =
107116
(if (commit) txn.commit(this) else txn.abort(this)).map(_ => result)
117+
108118
startTransaction(builder).flatMap { txn =>
109119
action(txn)
110120
.flatMap(result => (if (result.isRight) txn.commit(this) else txn.abort(this)).map(_ => result))
@@ -114,7 +124,9 @@ class ScalazAsyncHandler extends AsyncHandler[Task] {
114124

115125
override def startTransaction(builder: api.transaction.TransactionBuilder): Task[TransactionContext] =
116126
builder.build().map(TransactionContext(_))
127+
117128
override def commitTransaction(txn: Transaction): Task[Unit] = txn.commit()
129+
118130
override def abortTransaction(txn: Transaction): Task[Unit] = txn.abort()
119131
}
120132

pulsar4s-zio/src/main/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandler.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class ZioAsyncHandler extends AsyncHandler[Task] {
3939
fromFuture(ZIO.attempt(consumer.receiveAsync())) flatMap (v => ZIO.attempt(ConsumerMessage.fromJava(v)))
4040

4141
override def receiveBatch[T](consumer: Consumer[T]): Task[Vector[ConsumerMessage[T]]] =
42-
fromFuture(ZIO.attempt(consumer.batchReceiveAsync())) flatMap (v => ZIO.attempt(v.asScala.map(ConsumerMessage.fromJava).toVector))
42+
fromFuture(ZIO.attempt(consumer.batchReceiveAsync())) flatMap (v => ZIO
43+
.attempt(v.asScala.map(ConsumerMessage.fromJava).toVector))
4344

4445
override def getLastMessageId[T](consumer: api.Consumer[T]): Task[MessageId] =
4546
fromFuture(ZIO.attempt(consumer.getLastMessageIdAsync)) flatMap (v => ZIO.attempt(MessageId.fromJava(v)))
@@ -62,6 +63,9 @@ class ZioAsyncHandler extends AsyncHandler[Task] {
6263
override def seekAsync(consumer: api.Consumer[_], messageId: MessageId): Task[Unit] =
6364
fromFuture(ZIO.attempt(consumer.seekAsync(messageId))).unit
6465

66+
override def seekAsync(consumer: api.Consumer[_], timestamp: Long): Task[Unit] =
67+
fromFuture(ZIO.attempt(consumer.seekAsync(timestamp))).unit
68+
6569
override def seekAsync(reader: api.Reader[_], messageId: MessageId): Task[Unit] =
6670
fromFuture(ZIO.attempt(reader.seekAsync(messageId))).unit
6771

@@ -86,16 +90,17 @@ class ZioAsyncHandler extends AsyncHandler[Task] {
8690
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId): Task[Unit] =
8791
fromFuture(ZIO.attempt(consumer.acknowledgeCumulativeAsync(messageId))).unit
8892

89-
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId, txn: Transaction): Task[Unit] =
93+
override def acknowledgeCumulativeAsync[T](consumer: api.Consumer[T], messageId: MessageId,
94+
txn: Transaction): Task[Unit] =
9095
fromFuture(ZIO.attempt(consumer.acknowledgeCumulativeAsync(messageId, txn))).unit
9196

9297
override def negativeAcknowledgeAsync[T](consumer: Consumer[T], messageId: MessageId): Task[Unit] =
9398
ZIO.attempt(consumer.negativeAcknowledge(messageId))
9499

95100
override def withTransaction[E, A](
96-
builder: api.transaction.TransactionBuilder,
97-
action: TransactionContext => Task[Either[E, A]]
98-
): Task[Either[E, A]] = {
101+
builder: api.transaction.TransactionBuilder,
102+
action: TransactionContext => Task[Either[E, A]]
103+
): Task[Either[E, A]] = {
99104
ZIO.acquireReleaseExitWith[Any, Throwable, TransactionContext](startTransaction(builder))(
100105
(txn: TransactionContext, e: Exit[Throwable, Either[E, A]]) => (txn, e) match {
101106
case (txn, Exit.Success(Right(_))) => txn.commit(this).ignore
@@ -106,7 +111,9 @@ class ZioAsyncHandler extends AsyncHandler[Task] {
106111

107112
override def startTransaction(builder: api.transaction.TransactionBuilder): Task[TransactionContext] =
108113
fromFuture(ZIO.attempt(builder.build())).map(TransactionContext(_))
114+
109115
override def commitTransaction(txn: Transaction): Task[Unit] = fromFuture(ZIO.attempt(txn.commit())).unit
116+
110117
override def abortTransaction(txn: Transaction): Task[Unit] = fromFuture(ZIO.attempt(txn.abort())).unit
111118
}
112119

0 commit comments

Comments
 (0)