|
1 | 1 | package io.smallrye.reactive.messaging.example.eventclouds; |
2 | 2 |
|
3 | | -import io.reactivex.Flowable; |
4 | | -import io.reactivex.schedulers.Schedulers; |
5 | | -import io.smallrye.reactive.messaging.cloudevents.CloudEventMessage; |
6 | | -import io.smallrye.reactive.messaging.cloudevents.CloudEventMessageBuilder; |
7 | | -import org.eclipse.microprofile.reactive.messaging.Outgoing; |
8 | | -import org.reactivestreams.Publisher; |
9 | | - |
10 | | -import javax.enterprise.context.ApplicationScoped; |
11 | 3 | import java.net.URI; |
12 | 4 | import java.time.ZonedDateTime; |
13 | 5 | import java.util.UUID; |
14 | 6 | import java.util.concurrent.TimeUnit; |
15 | 7 |
|
| 8 | +import javax.enterprise.context.ApplicationScoped; |
| 9 | + |
| 10 | +import org.eclipse.microprofile.reactive.messaging.Outgoing; |
| 11 | +import org.reactivestreams.Publisher; |
| 12 | + |
| 13 | +import io.reactivex.Flowable; |
| 14 | +import io.reactivex.schedulers.Schedulers; |
| 15 | +import io.smallrye.reactive.messaging.cloudevents.CloudEventMessage; |
| 16 | +import io.smallrye.reactive.messaging.cloudevents.CloudEventMessageBuilder; |
| 17 | + |
16 | 18 | @ApplicationScoped |
17 | 19 | public class MyCloudEventSource { |
18 | 20 |
|
19 | 21 | @Outgoing("source") |
20 | 22 | public Publisher<CloudEventMessage<String>> source() { |
21 | 23 | return Flowable.interval(1, TimeUnit.SECONDS) |
22 | | - .observeOn(Schedulers.computation()) |
23 | | - .map(l -> new CloudEventMessageBuilder<String>() |
24 | | - .withId(UUID.randomUUID().toString()) |
25 | | - .withType("counter") |
26 | | - .withSource(new URI("local://timer")) |
27 | | - .withDataContentType("text/plain") |
28 | | - .withTime(ZonedDateTime.now()) |
29 | | - .withData(Long.toString(l)) |
30 | | - .build()); |
| 24 | + .observeOn(Schedulers.computation()) |
| 25 | + .map(l -> new CloudEventMessageBuilder<String>() |
| 26 | + .withId(UUID.randomUUID().toString()) |
| 27 | + .withType("counter") |
| 28 | + .withSource(new URI("local://timer")) |
| 29 | + .withDataContentType("text/plain") |
| 30 | + .withTime(ZonedDateTime.now()) |
| 31 | + .withData(Long.toString(l)) |
| 32 | + .build()); |
31 | 33 | } |
32 | 34 | } |
0 commit comments