From e41743c3980f52602619fb190be14ce547eebe8a Mon Sep 17 00:00:00 2001 From: Christoph Stumpf Date: Tue, 28 Nov 2017 11:28:39 +0100 Subject: [PATCH] Upgrade to Akka 2.5.7 (#401) Akka 2.5.7 introduced the following incompatible changes in the Java-API - ReceiveBuilder: - explicit create step required - builds `AbstractActor.Receive` instead of `Actor.Receive` - Match.match: Changes in type parameters To adjust to these changes the Java-API of Eventuate is streamlined and mirrors the new Akka Java-API by removing all `setOn...`-methods used to defined the actor's behavior and replaces these methods with `createOn...`-variants which can be used to define custom behavior by returning the behavior definition wrapped in an instance of `AbstractActor.Receive`. New Java-specific accessor methods are introduced to access the properties of any event-sourced component with the Java-API. `AbstractEventsourcedProcessor` supports behavior definition by overriding the `createOnProcess` method which facilitates a Java-specific `Process` type to define custom behavior. The `ProcessBuilder` may be used to create instances of type `Process`. A Java-specific `BehaviorContext` is added to the Java-API of al all event-sourced components which can be retrieved by calling `getContext()`. --- .../eventuate/ProcessBuilder.java | 103 ++++-- .../AbstractEventsourcedProcessor.scala | 58 ++-- ...bstractEventsourcedStatefulProcessor.scala | 2 +- .../eventuate/AbstractEventsourcedView.scala | 296 ++++++++++++------ .../AbstractEventsourcedActorSpec.java | 29 +- .../AbstractEventsourcedProcessorSpec.java | 9 +- .../AbstractEventsourcedViewSpec.java | 76 +++-- .../AbstractEventsourcedWriterSpec.java | 16 +- .../vertx/japi/VertxAdapterExample.java | 26 +- .../example/japi/ordermgnt/OrderActor.java | 107 ++++--- .../example/japi/ordermgnt/OrderExample.java | 7 +- .../example/japi/ordermgnt/OrderManager.java | 36 ++- .../example/japi/ordermgnt/OrderView.java | 24 +- .../example/japi/querydb/Emitter.java | 48 +-- .../example/japi/querydb/Writer.java | 15 +- project/ProjectDependencies.scala | 2 +- .../code/userguide/japi/ActorExample.java | 32 +- .../userguide/japi/CommunicationExample.java | 34 +- .../userguide/japi/ConcurrentExample.java | 53 ++-- .../userguide/japi/ConditionalExample.java | 31 +- .../code/userguide/japi/ResolveExample.java | 89 +++--- .../code/userguide/japi/TrackingExample.java | 33 +- .../code/userguide/japi/ViewExample.java | 24 +- 23 files changed, 709 insertions(+), 441 deletions(-) diff --git a/eventuate-core/src/main/java/com/rbmhtechnology/eventuate/ProcessBuilder.java b/eventuate-core/src/main/java/com/rbmhtechnology/eventuate/ProcessBuilder.java index 25fe690a..a66a7085 100644 --- a/eventuate-core/src/main/java/com/rbmhtechnology/eventuate/ProcessBuilder.java +++ b/eventuate-core/src/main/java/com/rbmhtechnology/eventuate/ProcessBuilder.java @@ -17,66 +17,115 @@ package com.rbmhtechnology.eventuate; import akka.japi.pf.FI; -import akka.japi.pf.Match; import akka.japi.pf.PFBuilder; /** - * Java API for building a PartialFunction that matches arbitrary [[Object]]s to {@link Iterable}s. + * Java API for building a {@link AbstractEventsourcedProcessor.Process} behavior that matches arbitrary [[Object]]s to {@link Iterable}s. * - * Can be used to define processing behaviour in {@link AbstractEventsourcedProcessor#setOnProcessEvent} - * or {@link AbstractEventsourcedProcessor#onProcessEvent}. + * Used to define processing behaviour in {@link AbstractEventsourcedProcessor#createOnProcessEvent}. */ public final class ProcessBuilder { - private ProcessBuilder() { + + private final PFBuilder> underlying; + + private ProcessBuilder(PFBuilder> underlying) { + this.underlying = underlying; + } + + /** + * Returns a new {@link ProcessBuilder} instance. + * + * @return a {@link ProcessBuilder} + */ + public static ProcessBuilder create() { + return new ProcessBuilder(new PFBuilder<>()); } /** - * Returns a new {@link PFBuilder} of {@link java.lang.Iterable} with a case statement added. + * Add a new case statement to this builder. * * @param type the type to match the argument against - * @param apply the function to apply for the given argument - must return {@link java.lang.Iterable} + * @param apply the function to apply for the given argument - must return {@link Iterable} * @param

the type of the argument - * @return a builder with the case statement added + * @return the {@link ProcessBuilder} with the case statement added */ - public static

PFBuilder> match(final Class type, final IterableApply apply) { - return Match.match(type, apply); + public

ProcessBuilder match(final Class

type, IterableApply

apply) { + underlying.match(type, apply); + return this; } /** - * Returns a new {@link PFBuilder} of {@link java.lang.Iterable} with a case statement added. + * Add a new case statement to this builder. * * @param type the type to match the argument against * @param predicate the predicate to match the argument against - * @param apply the function to apply for the given argument - must return {@link java.lang.Iterable} + * @param apply the function to apply for the given argument - must return {@link Iterable} * @param

the type of the argument - * @return a builder with the case statement added + * @return the {@link ProcessBuilder} with the case statement added + */ + public

ProcessBuilder match(final Class

type, final FI.TypedPredicate

predicate, final IterableApply

apply) { + underlying.match(type, predicate, apply); + return this; + } + + /** + * Add a new case statement to this builder without compile time type check of the parameters. + * Should normally not be used, but when matching on class with generic type. + * + * @param type the type to match the argument against + * @param apply the function to apply for the given argument - must return {@link Iterable} + * @return the {@link ProcessBuilder} with the case statement added + */ + public ProcessBuilder matchUnchecked(final Class type, IterableApply apply) { + underlying.matchUnchecked(type, apply); + return this; + } + + /** + * Add a new case statement to this builder without compile time type check of the parameters. + * Should normally not be used, but when matching on class with generic type. + * + * @param type the type to match the argument against + * @param predicate a predicate that will be evaluated on the argument if the type matches + * @param apply the function to apply for the given argument - must return {@link Iterable} + * @return the {@link ProcessBuilder} with the case statement added */ - public static

PFBuilder> match(final Class type, - final FI.TypedPredicate predicate, - final IterableApply apply) { - return Match.match(type, predicate, apply); + public ProcessBuilder matchUnchecked(final Class type, final FI.TypedPredicate predicate, final IterableApply apply) { + underlying.matchUnchecked(type, predicate, apply); + return this; } /** - * Returns a new {@link PFBuilder} of {@link java.lang.Iterable} with a case statement added. + * Add a new case statement to this builder. * * @param object the object to match the argument against - * @param apply the function to apply for the given argument - must return an {@link java.lang.Iterable} + * @param apply the function to apply for the given argument - must return an {@link Iterable} * @param

the type of the argument - * @return a builder with the case statement added + * @return the {@link ProcessBuilder} with the case statement added + */ + public

ProcessBuilder matchEquals(final P object, final IterableApply

apply) { + underlying.matchEquals(object, apply); + return this; + } + + /** + * Add a new case statement to this builder, that matches any argument. + * + * @param apply the function to apply for the given argument - must return an {@link Iterable} + * @return the {@link ProcessBuilder} with the case statement added */ - public static

PFBuilder> matchEquals(final P object, final IterableApply

apply) { - return Match.matchEquals(object, apply); + public ProcessBuilder matchAny(final IterableApply apply) { + underlying.matchAny(apply); + return this; } /** - * Returns a new {@link PFBuilder} of {@link java.lang.Iterable} with a default case statement added. + * Builds the resulting processing behavior as an instance of {@link AbstractEventsourcedProcessor.Process}. * - * @param apply the function to apply for the given argument - must return an {@link java.lang.Iterable} - * @return a builder with the case statement added + * @return the configured {@link AbstractEventsourcedProcessor.Process} */ - public static PFBuilder> matchAny(final IterableApply apply) { - return Match.matchAny(apply); + public AbstractEventsourcedProcessor.Process build() { + return new AbstractEventsourcedProcessor.Process(underlying.build()); } public interface IterableApply extends FI.Apply> { diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedProcessor.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedProcessor.scala index 4c2b32c9..12e341ed 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedProcessor.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedProcessor.scala @@ -21,6 +21,7 @@ import java.util.{ Optional => JOption } import akka.actor.ActorRef +import scala.collection.immutable._ import scala.collection.JavaConverters._ import scala.compat.java8.OptionConverters._ @@ -32,47 +33,25 @@ private[eventuate] trait EventsourcedProcessorHandlers { private[eventuate] trait EventSourcedProcessorAdapter extends EventsourcedProcessorHandlers with EventsourcedWriterFailureHandlerAdapter { this: EventsourcedProcessor => - type JProcess = PartialFunction[Any, JIterable[Any]] - - object JProcess { - object emptyBehavior extends JProcess { - def isDefinedAt(x: Any) = false - def apply(x: Any) = throw new UnsupportedOperationException("Empty process behavior apply()") - } - } - - private var processBehaviour: Option[JProcess] = None - - abstract override final def processEvent: Process = - onProcessEvent.andThen(seq => seq.asScala.to[collection.immutable.Seq]) - /** * Java API of [[EventsourcedProcessor.processEvent event-processing]] handler. * - * Returns a partial function that defines the actor's event processing behaviour. + * Returns a [[AbstractEventsourcedProcessor.Process]] that defines the actor's event processing behaviour. * Use [[ProcessBuilder]] to define the behaviour. * - * Takes precedence over [[setOnProcessEvent]]. - * * @see [[EventsourcedProcessor]] */ - def onProcessEvent: JProcess = processBehaviour.getOrElse(JProcess.emptyBehavior) + def createOnProcessEvent(): AbstractEventsourcedProcessor.Process = + AbstractEventsourcedProcessor.emptyProcessBehavior + + abstract override final def processEvent: Process = + createOnProcessEvent().asScala /** - * Java API that sets this actor's [[EventsourcedProcessor.processEvent event-processing]] handler. - * - * Supplied with a partial function that defines the actor's event processing behaviour. - * Use [[ProcessBuilder]] to define the behaviour. - * - * If [[onProcessEvent]] is implemented, the supplied behaviour is ignored. - * - * @param handler This actor's event processing handler. - * @see [[EventsourcedProcessor]] + * creates a new empty [[ProcessBuilder]] */ - def setOnProcessEvent(handler: JProcess): Unit = - if (processBehaviour.isEmpty) processBehaviour = Some(handler) - else throw new IllegalStateException("Actor process behaviour has already been set with setOnProcessEvent(...). " + - "The behaviour can only be set once.") + final def processBuilder(): ProcessBuilder = + ProcessBuilder.create() } private[eventuate] trait EventsourcedProcessorWriteSuccessHandlerAdapter extends EventsourcedWriterSuccessHandlers[Long, Long] { @@ -90,13 +69,28 @@ private[eventuate] trait EventsourcedProcessorWriteSuccessHandlerAdapter extends super.writeSuccess(result) } +/** + * Java API + */ +object AbstractEventsourcedProcessor { + + final class Process(val behaviour: PartialFunction[Any, JIterable[Any]]) + + final val emptyProcessBehavior: Process = new Process(PartialFunction.empty) + + implicit class ProcessConverter(p: Process) { + def asScala: EventsourcedProcessor.Process = + p.behaviour.andThen(seq => seq.asScala.to[Seq]) + } +} + /** * Java API for actors that implement [[EventsourcedProcessor]]. * * @see [[AbstractEventsourcedView]] for a detailed usage of the Java API * @see [[EventsourcedProcessor]] */ -class AbstractEventsourcedProcessor(id: String, eventLog: ActorRef, val targetEventLog: ActorRef) extends AbstractEventsourcedView(id, eventLog) +class AbstractEventsourcedProcessor(val id: String, val eventLog: ActorRef, val targetEventLog: ActorRef) extends AbstractEventsourcedComponent with EventsourcedProcessor with EventSourcedProcessorAdapter with EventsourcedProcessorWriteSuccessHandlerAdapter { override final def readSuccess(result: Long): Option[Long] = diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedStatefulProcessor.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedStatefulProcessor.scala index b7614ec7..de229c0a 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedStatefulProcessor.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedStatefulProcessor.scala @@ -24,6 +24,6 @@ import akka.actor.ActorRef * @see [[AbstractEventsourcedView]] for a detailed usage of the Java API * @see [[StatefulProcessor]] */ -class AbstractEventsourcedStatefulProcessor(id: String, eventLog: ActorRef, val targetEventLog: ActorRef) extends AbstractEventsourcedView(id, eventLog) +class AbstractEventsourcedStatefulProcessor(val id: String, val eventLog: ActorRef, val targetEventLog: ActorRef) extends AbstractEventsourcedComponent with StatefulProcessor with EventSourcedProcessorAdapter with EventsourcedProcessorWriteSuccessHandlerAdapter { } diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedView.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedView.scala index b6be4ffd..6a08995e 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedView.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/AbstractEventsourcedView.scala @@ -18,49 +18,87 @@ package com.rbmhtechnology.eventuate import java.util.{ Optional => JOption } -import akka.actor.{ Actor, ActorRef } +import akka.actor.AbstractActor +import akka.actor.Actor.Receive +import akka.actor.ActorRef +import akka.japi.pf.ReceiveBuilder import scala.compat.java8.OptionConverters._ +import scala.runtime.BoxedUnit + +private[eventuate] object ReceiveConverters { + + implicit class ScalaReceiveConverter(receive: Receive) { + def asJava: AbstractActor.Receive = + new AbstractActor.Receive(receive.asInstanceOf[PartialFunction[Any, BoxedUnit]]) + } + + implicit class JavaReceiveConverter(receive: AbstractActor.Receive) { + def asScala: Receive = + receive.onMessage.asInstanceOf[Receive] + } +} + +private[eventuate] object AbstractEventsourcedComponent { + + import ReceiveConverters._ + + /** + * Java API of the actors [[com.rbmhtechnology.eventuate.BehaviorContext]]. + * + * Provides a context for managing behaviors. + */ + final class BehaviorContext private[eventuate] (context: com.rbmhtechnology.eventuate.BehaviorContext) { + + /** + * The initial behavior. + */ + def initial: AbstractActor.Receive = + context.initial.asJava + + /** + * The current behavior. + */ + def current: AbstractActor.Receive = + context.current.asJava + + /** + * Sets the given `behavior` as `current` behavior. Replaces the `current` behavior on the behavior stack. + * + * @param behavior the behavior to set. + */ + def become(behavior: AbstractActor.Receive): Unit = + become(behavior, replace = true) + + /** + * Sets the given `behavior` as `current` behavior. + * + * @param behavior the behavior to set. + * @param replace if `true` (default) replaces the `current` behavior on the behavior stack, if `false` + * pushes `behavior` on the behavior stack. + */ + def become(behavior: AbstractActor.Receive, replace: Boolean): Unit = + context.become(behavior.asScala, replace) + + /** + * Pops the current `behavior` from the behavior stack, making the previous behavior the `current` behavior. + * If the behavior stack contains only a single element, the `current` behavior is reverted to the `initial` + * behavior. + */ + def unbecome(): Unit = + context.unbecome() + } +} + /** - * Java API for actors that implement [[EventsourcedView]]. + * Java API for actors that implement any event-sourced component. * - * Actor handlers may be initialized once in the constructor with various set-methods (e.g. setOnCommand, setOnEvent) - * or by overriding the respective handler (e.g. onCommand(...), onEvent(...)). - * If a handler is overridden, the behaviour of it's respective set-method will be ignored. - * - * Example: - * {{{ - * public class HelloActor extends AbstractEventsourcedView { - * public HelloActor(final String id, final ActorRef eventLog) { - * super(id, eventLog); - * - * onCommand(ReceiveBuilder - * .match(String.class, str -> str.equals("Hello"), value -> sender().tell("World", self()) - * .matchAny(ev -> value -> sender().tell("Please try again", self()) - * .build()); - * } - * - * public PartialFunction onEvent() { - * return ReceiveBuilder - * .matchAny(HelloEvent.class, ev -> handleEvent(ev)) - * .build(); - * } - * } - * }}} - * - * @see [[EventsourcedView]] - * @define akkaReceiveBuilder http://doc.akka.io/japi/akka/2.3.9/akka/japi/pf/ReceiveBuilder.html + * @define akkaReceiveBuilder http://doc.akka.io/japi/akka/2.5.2/akka/japi/pf/ReceiveBuilder.html */ -abstract class AbstractEventsourcedView(val id: String, val eventLog: ActorRef) extends EventsourcedView with ConditionalRequests { - private var commandBehaviour: Option[Receive] = None - private var eventBehaviour: Option[Receive] = None - private var snapshotBehaviour: Option[Receive] = None - private var recoveryHandler: Option[ResultHandler[Unit]] = None +private[eventuate] abstract class AbstractEventsourcedComponent extends EventsourcedView with ConditionalRequests { - override final def onRecovery: Handler[Unit] = onRecover.asScala - - override final def aggregateId: Option[String] = getAggregateId.asScala + import ReceiveConverters._ /** * Java API of [[EventsourcedView.aggregateId aggregateId]]. @@ -69,12 +107,7 @@ abstract class AbstractEventsourcedView(val id: String, val eventLog: ActorRef) */ def getAggregateId: JOption[String] = JOption.empty() - /** - * Java API of [[EventsourcedView.lastEmitterAggregateId lastEmitterAggregateId]]. - * - * @see [[EventsourcedView]] - */ - final def getLastEmitterAggregateId: JOption[String] = lastHandledEvent.emitterAggregateId.asJava + override final def aggregateId: Option[String] = getAggregateId.asScala /** * Java API of [[EventsourcedView.save save]]. @@ -89,38 +122,26 @@ abstract class AbstractEventsourcedView(val id: String, val eventLog: ActorRef) /** * Java API of the [[EventsourcedView.onCommand command]] handler. * - * Returns a partial function that defines the actor's command handling behaviour. + * Returns a receive object that defines the actor's command handling behaviour. * Use [[$akkaReceiveBuilder ReceiveBuilder]] to define the behaviour. * - * Takes precedence over [[setOnCommand]]. - * * @see [[EventsourcedView]] */ - override def onCommand = commandBehaviour.getOrElse(Actor.emptyBehavior) + def createOnCommand(): AbstractActor.Receive = AbstractActor.emptyBehavior - /** - * Java API of the [[EventsourcedView.onEvent event]] handler. - * - * Returns a partial function that defines the actor's event handling behaviour. - * Use [[$akkaReceiveBuilder ReceiveBuilder]] to define the behaviour. - * - * Takes precedence over [[setOnEvent]]. - * - * @see [[EventsourcedView]] - */ - override def onEvent = eventBehaviour.getOrElse(Actor.emptyBehavior) + final override def onCommand: Receive = createOnCommand().asScala /** * Java API of the [[EventsourcedView.onSnapshot snapshot]] handler. * - * Returns a partial function that defines the actor's snapshot handling behaviour. + * Returns a receive object that defines the actor's snapshot handling behaviour. * Use [[$akkaReceiveBuilder ReceiveBuilder]] to define the behaviour. * - * Takes precedence over [[setOnSnapshot]]. - * * @see [[EventsourcedView]] */ - override def onSnapshot = snapshotBehaviour.getOrElse(Actor.emptyBehavior) + def createOnSnapshot(): AbstractActor.Receive = AbstractActor.emptyBehavior + + final override def onSnapshot: Receive = createOnSnapshot().asScala /** * Java API of the [[EventsourcedView.onRecovery recovery]] handler. @@ -128,73 +149,148 @@ abstract class AbstractEventsourcedView(val id: String, val eventLog: ActorRef) * Returns a result handler that defines the actor's recover handling behaviour. * Use [[ResultHandler]] to define the behaviour. * - * Takes precedence over [[setOnRecover]]. - * * @see [[EventsourcedView]] */ - def onRecover: ResultHandler[Unit] = recoveryHandler.getOrElse(ResultHandler.none[Unit]) + def createOnRecovery(): ResultHandler[Unit] = ResultHandler.none[Unit] + + override final def onRecovery: Handler[Unit] = createOnRecovery().asScala + + /** + * Creates a new empty `ReceiveBuilder`. Mimics [[AbstractActor.receiveBuilder]]. + */ + final def receiveBuilder(): ReceiveBuilder = ReceiveBuilder.create() + + /** + * Returns this AbstractActor's ActorContext. Mimics [[AbstractActor.getContext]]. + */ + def getContext: AbstractActor.ActorContext = context.asInstanceOf[AbstractActor.ActorContext] + + /** + * Returns the ActorRef for this actor. Mimics [[AbstractActor.getSelf]]. + */ + def getSelf: ActorRef = self + + /** + * The reference sender Actor of the currently processed message. Mimics [[AbstractActor.getSender]]. + */ + def getSender: ActorRef = sender() /** - * Java API that sets this actor's [[EventsourcedView.onCommand command]] handler. + * Java API of [[EventsourcedView.recovering lastEmirecoveringterId]]. * - * Supplied with a partial function that defines the actor's command handling behaviour. - * Use [[$akkaReceiveBuilder ReceiveBuilder]] to define the behaviour. + * @see [[EventsourcedView]] + */ + final def isRecovering: Boolean = recovering + + /** + * Java API of [[EventsourcedView.lastVectorTimestamp lastVectorTimestamp]]. * - * If [[onCommand]] is implemented, the supplied behaviour is ignored. + * @see [[EventsourcedView]] + */ + final def getLastVectorTimestamp: VectorTime = lastVectorTimestamp + + /** + * Java API of [[EventsourcedView.lastSequenceNr lastSequenceNr]]. * - * @param handler a function that defines this actor's command handling behaviour. * @see [[EventsourcedView]] */ - final protected def setOnCommand(handler: Receive): Unit = - if (commandBehaviour.isEmpty) commandBehaviour = Some(handler) - else throw new IllegalStateException("Actor command behaviour has already been set with setOnCommand(...). " + - "Use commandContext.become(...) to change it.") + final def getLastSequenceNr: Long = lastSequenceNr /** - * Java API that sets this actor's [[EventsourcedView.onEvent event]] handler. + * Java API of [[EventsourcedView.lastSystemTimestamp lastSystemTimestamp]]. * - * Supplied with a partial function that defines the actor's event handling behaviour. - * Use [[$akkaReceiveBuilder ReceiveBuilder]] to define the behaviour. + * @see [[EventsourcedView]] + */ + final def getLastSystemTimestamp: Long = lastSystemTimestamp + + /** + * Java API of [[EventsourcedView.lastEmitterId lastEmitterId]]. * - * If [[onEvent]] is implemented, the supplied behaviour is ignored. + * @see [[EventsourcedView]] + */ + final def getLastEmitterId: String = lastEmitterId + + /** + * Java API of [[EventsourcedView.lastEmitterAggregateId lastEmitterAggregateId]]. * - * @param handler a function that defines this actor's event handling behaviour. * @see [[EventsourcedView]] */ - final protected def setOnEvent(handler: Receive): Unit = - if (eventBehaviour.isEmpty) eventBehaviour = Some(handler) - else throw new IllegalStateException("Actor event behaviour has already been set with setOnEvent(...). " + - "Use eventContext.become(...) to change it.") + final def getLastEmitterAggregateId: JOption[String] = lastEmitterAggregateId.asJava /** - * Java API that sets this actor's [[EventsourcedView.onSnapshot snapshot]] handler. + * Java API of [[EventsourcedView.commandContext commandContext]]. * - * Supplied with a partial function that defines the actor's snapshot handling behaviour. - * Use [[$akkaReceiveBuilder ReceiveBuilder]] to define the behaviour. + * Returns the command [[BehaviorContext]]. + * + * @see [[EventsourcedView]] + */ + final def getCommandContext: AbstractEventsourcedComponent.BehaviorContext = + new AbstractEventsourcedComponent.BehaviorContext(commandContext) + + /** + * Java API of [[EventsourcedView.eventContext eventContext]]. * - * If [[onSnapshot]] is implemented, the supplied behaviour is ignored. + * Returns the event [[BehaviorContext]]. * - * @param handler a function that defines this actor's snapshot handling behaviour. * @see [[EventsourcedView]] */ - final protected def setOnSnapshot(handler: Receive): Unit = - if (snapshotBehaviour.isEmpty) snapshotBehaviour = Some(handler) - else throw new IllegalStateException("Actor snapshot behaviour has already been set with setOnSnapshot(...). " + - "Use snapshotContext.become(...) to change it.") + final def getEventContext: AbstractEventsourcedComponent.BehaviorContext = + new AbstractEventsourcedComponent.BehaviorContext(eventContext) /** - * Java API that sets this actor's [[EventsourcedView.onRecovery recovery]] handler. + * Java API of [[EventsourcedView.snapshotContext snapshotContext]]. * - * Supplied with a result handler that defines the actor's recover handling behaviour. - * Use [[ResultHandler]] to define the behaviour. + * Returns the snapshot [[BehaviorContext]]. * - * If [[onRecover]] is implemented, the supplied behaviour is ignored. + * @see [[EventsourcedView]] + */ + final def getSnapshotContext: AbstractEventsourcedComponent.BehaviorContext = + new AbstractEventsourcedComponent.BehaviorContext(snapshotContext) +} + +/** + * Java API for actors that implement [[EventsourcedView]]. + * + * Actor handlers may be initialized once in the constructor with various set-methods (e.g. setOnCommand, setOnEvent) + * or by overriding the respective handler (e.g. onCommand(...), onEvent(...)). + * If a handler is overridden, the behaviour of it's respective set-method will be ignored. + * + * Example: + * {{{ + * public class HelloActor extends AbstractEventsourcedView { + * public HelloActor(final String id, final ActorRef eventLog) { + * super(id, eventLog); + * + * onCommand(ReceiveBuilder + * .match(String.class, str -> str.equals("Hello"), value -> sender().tell("World", self()) + * .matchAny(ev -> value -> sender().tell("Please try again", self()) + * .build()); + * } + * + * public PartialFunction onEvent() { + * return ReceiveBuilder + * .matchAny(HelloEvent.class, ev -> handleEvent(ev)) + * .build(); + * } + * } + * }}} + * + * @see [[EventsourcedView]] + * @define akkaReceiveBuilder http://doc.akka.io/japi/akka/2.5.2/akka/japi/pf/ReceiveBuilder.html + */ +abstract class AbstractEventsourcedView(val id: String, val eventLog: ActorRef) extends AbstractEventsourcedComponent { + + import ReceiveConverters._ + + /** + * Java API of the [[EventsourcedView.onEvent event]] handler. + * + * Returns a receive object that defines the actor's event handling behaviour. + * Use [[$akkaReceiveBuilder ReceiveBuilder]] to define the behaviour. * - * @param handler a [[ResultHandler]] that defines this actor's recover handling behaviour. * @see [[EventsourcedView]] */ - final protected def setOnRecover(handler: ResultHandler[Unit]): Unit = - if (recoveryHandler.isEmpty) recoveryHandler = Some(handler) - else throw new IllegalStateException("Actor recover behaviour has already been set with setOnRecover(...). " + - "The behaviour can only be set once.") + def createOnEvent(): AbstractActor.Receive = AbstractActor.emptyBehavior + + override final def onEvent: Receive = createOnEvent().asScala } diff --git a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedActorSpec.java b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedActorSpec.java index 2923fad6..ea7d4d45 100644 --- a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedActorSpec.java +++ b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedActorSpec.java @@ -16,10 +16,10 @@ package com.rbmhtechnology.eventuate; +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; -import akka.japi.pf.ReceiveBuilder; import akka.testkit.TestProbe; import com.rbmhtechnology.eventuate.EventsourcedActorSpec.Cmd; import com.rbmhtechnology.eventuate.EventsourcingProtocol.Write; @@ -33,25 +33,30 @@ import static java.util.stream.Collectors.toList; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class AbstractEventsourcedActorSpec extends BaseSpec { public static class TestEventsourcedActor extends AbstractEventsourcedActor { + private final ActorRef mgsProbe; + public TestEventsourcedActor(final String id, final ActorRef logProbe, final ActorRef mgsProbe) { super(id, logProbe); + this.mgsProbe = mgsProbe; + } - setOnCommand(ReceiveBuilder - .match(Cmd.class, cmd -> cmd.num() == 1, cmd -> persist(cmd.payload(), ResultHandler.on( - success -> mgsProbe.tell(cmd.payload(), self()), - failure -> mgsProbe.tell(failure, self()) - ))) - .match(Cmd.class, cmd -> persistN(createN(cmd.payload(), cmd.num()), ResultHandler.on( - success -> mgsProbe.tell(cmd.payload(), self()), - failure -> mgsProbe.tell(failure, self()) - ))) - .build()); + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(Cmd.class, cmd -> cmd.num() == 1, cmd -> persist(cmd.payload(), ResultHandler.on( + success -> mgsProbe.tell(cmd.payload(), getSelf()), + failure -> mgsProbe.tell(failure, getSelf()) + ))) + .match(Cmd.class, cmd -> persistN(createN(cmd.payload(), cmd.num()), ResultHandler.on( + success -> mgsProbe.tell(cmd.payload(), getSelf()), + failure -> mgsProbe.tell(failure, getSelf()) + ))) + .build(); } private Collection createN(final Object payload, final int cnt) { diff --git a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedProcessorSpec.java b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedProcessorSpec.java index 385d2e04..e0c8242d 100644 --- a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedProcessorSpec.java +++ b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedProcessorSpec.java @@ -45,10 +45,13 @@ public static class TestEventsourcedProcessor extends AbstractEventsourcedProces public TestEventsourcedProcessor(final String id, final ActorRef srcProbe, final ActorRef targetProbe, final ActorRef appProbe) { super(id, srcProbe, targetProbe); this.appProbe = appProbe; + } - setOnProcessEvent(ProcessBuilder - .match(String.class, s -> Arrays.asList(s + "-1", s + "-2")) - .build()); + @Override + public Process createOnProcessEvent() { + return processBuilder() + .match(String.class, s -> Arrays.asList(s + "-1", s + "-2")) + .build(); } @Override diff --git a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedViewSpec.java b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedViewSpec.java index 64bd6ee7..37ac0f57 100644 --- a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedViewSpec.java +++ b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedViewSpec.java @@ -16,10 +16,10 @@ package com.rbmhtechnology.eventuate; +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; -import akka.japi.pf.ReceiveBuilder; import akka.testkit.TestProbe; import com.rbmhtechnology.eventuate.EventsourcedViewSpec.Ping; import com.rbmhtechnology.eventuate.EventsourcedViewSpec.Pong; @@ -32,56 +32,82 @@ import org.junit.Before; import org.junit.Test; import scala.Option; -import scala.PartialFunction; import scala.runtime.BoxedUnit; public class AbstractEventsourcedViewSpec extends BaseSpec { public static class TestEventsourcedView extends AbstractEventsourcedView { + + private final ActorRef msgProbe; + public TestEventsourcedView(final String id, final ActorRef eventProbe, final ActorRef msgProbe) { super(id, eventProbe); + this.msgProbe = msgProbe; + } - setOnCommand(ReceiveBuilder - .match(Ping.class, p -> msgProbe.tell(new Pong(p.i()), self())) - .build()); + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(Ping.class, p -> msgProbe.tell(new Pong(p.i()), getSelf())) + .build(); + } - setOnEvent(ReceiveBuilder - .matchAny(ev -> msgProbe.tell(Tuple.of(ev, lastVectorTimestamp(), lastSequenceNr()), self())) - .build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .matchAny(ev -> msgProbe.tell(Tuple.of(ev, getLastVectorTimestamp(), getLastSequenceNr()), getSelf())) + .build(); + } - setOnSnapshot(ReceiveBuilder - .matchAny(s -> msgProbe.tell("snapshot received", self())) - .build()); + @Override + public AbstractActor.Receive createOnSnapshot() { + return receiveBuilder() + .matchAny(s -> msgProbe.tell("snapshot received", getSelf())) + .build(); } } public static class TestCompletionView extends AbstractEventsourcedView { + + private final ActorRef msgProbe; + public TestCompletionView(final String id, final ActorRef eventProbe, final ActorRef msgProbe) { super(id, eventProbe); + this.msgProbe = msgProbe; + } - setOnRecover(ResultHandler.on( - success -> msgProbe.tell("success", self()), - failure -> msgProbe.tell(failure, self()) - )); + @Override + public ResultHandler createOnRecovery() { + return ResultHandler.on( + success -> msgProbe.tell("success", getSelf()), + failure -> msgProbe.tell(failure, getSelf()) + ); } } public static class TestBehaviourView extends AbstractEventsourcedView { + + private final ActorRef msgProbe; + public TestBehaviourView(final String id, final ActorRef eventProbe, final ActorRef msgProbe) { super(id, eventProbe); + this.msgProbe = msgProbe; + } - setOnCommand(ReceiveBuilder - .match(Ping.class, p -> msgProbe.tell(new Pong(p.i()), self())) - .matchEquals("become-ping", c -> commandContext().become(ping(msgProbe), false)) - .matchEquals("unbecome", c -> commandContext().unbecome()) - .build()); + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(Ping.class, p -> msgProbe.tell(new Pong(p.i()), getSelf())) + .matchEquals("become-ping", c -> getCommandContext().become(ping(msgProbe), false)) + .matchEquals("unbecome", c -> getCommandContext().unbecome()) + .build(); } - private PartialFunction ping(final ActorRef msgProbe) { - return ReceiveBuilder - .match(Pong.class, p -> msgProbe.tell(new Ping(p.i()), self())) - .matchEquals("unbecome", c -> commandContext().unbecome()) - .build(); + private AbstractActor.Receive ping(final ActorRef msgProbe) { + return receiveBuilder() + .match(Pong.class, p -> msgProbe.tell(new Ping(p.i()), getSelf())) + .matchEquals("unbecome", c -> getCommandContext().unbecome()) + .build(); } } diff --git a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedWriterSpec.java b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedWriterSpec.java index f064d4e4..77da5077 100644 --- a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedWriterSpec.java +++ b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/AbstractEventsourcedWriterSpec.java @@ -16,6 +16,7 @@ package com.rbmhtechnology.eventuate; +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; @@ -48,8 +49,13 @@ public TestEventsourcedWriter(final String id, final ActorRef logProbe, final Ac super(id, logProbe); this.appProbe = appProbe; this.rwProbe = rwProbe; + } - setOnEvent(ReceiveBuilder.match(String.class, ev -> {}).build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(String.class, ev -> {}) + .build(); } @Override @@ -69,25 +75,25 @@ public CompletionStage onWrite() { @Override public Optional onReadSuccess(final String result) { - appProbe.tell(result, self()); + appProbe.tell(result, getSelf()); return super.onReadSuccess(result); } @Override public void onWriteSuccess(final String result) { - appProbe.tell(result, self()); + appProbe.tell(result, getSelf()); super.onWriteSuccess(result); } @Override public void onReadFailure(final Throwable cause) { - appProbe.tell(cause, self()); + appProbe.tell(cause, getSelf()); super.onReadFailure(cause); } @Override public void onWriteFailure(final Throwable cause) { - appProbe.tell(cause, self()); + appProbe.tell(cause, getSelf()); super.onWriteFailure(cause); } diff --git a/eventuate-example-vertx/src/main/java/com/rbmhtechnology/example/vertx/japi/VertxAdapterExample.java b/eventuate-example-vertx/src/main/java/com/rbmhtechnology/example/vertx/japi/VertxAdapterExample.java index f7cdfacf..4ed5b020 100644 --- a/eventuate-example-vertx/src/main/java/com/rbmhtechnology/example/vertx/japi/VertxAdapterExample.java +++ b/eventuate-example-vertx/src/main/java/com/rbmhtechnology/example/vertx/japi/VertxAdapterExample.java @@ -16,11 +16,11 @@ package com.rbmhtechnology.example.vertx.japi; +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.Util; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedView; import com.rbmhtechnology.eventuate.ApplicationVersion; import com.rbmhtechnology.eventuate.EndpointFilters$; @@ -171,29 +171,37 @@ public static class EventLogReader extends AbstractEventsourcedView { private List subscribers = List.empty(); private int eventsRead = 0; + private final int eventCount; public EventLogReader(String id, ActorRef eventLog, int eventCount) { super(id, eventLog); + this.eventCount = eventCount; + } - setOnCommand(ReceiveBuilder - .matchEquals("notifyOnComplete", s -> subscribers = subscribers.prepend(sender())) + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .matchEquals("notifyOnComplete", s -> subscribers = subscribers.prepend(getSender())) .matchEquals("eventRead", e -> { eventsRead = eventsRead + 1; if (eventsRead == eventCount) { - subscribers.forEach(s -> s.tell("finished", self())); + subscribers.forEach(s -> s.tell("finished", getSelf())); } }) - .build()); + .build(); + } - setOnEvent(ReceiveBuilder + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() .matchAny(ev -> { out.println(String.format("[e_reader] received [%s]", ev)); - if (!recovering()) { - self().tell("eventRead", self()); + if (!isRecovering()) { + getSelf().tell("eventRead", getSelf()); } }) - .build()); + .build(); } } diff --git a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderActor.java b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderActor.java index 9cb72d17..f65788c8 100644 --- a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderActor.java +++ b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderActor.java @@ -16,8 +16,8 @@ package com.rbmhtechnology.example.japi.ordermgnt; +import akka.actor.AbstractActor; import akka.actor.ActorRef; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedActor; import com.rbmhtechnology.eventuate.ConcurrentVersions; import com.rbmhtechnology.eventuate.ConcurrentVersionsTree; @@ -25,6 +25,7 @@ import com.rbmhtechnology.eventuate.SnapshotMetadata; import com.rbmhtechnology.eventuate.Versioned; import com.rbmhtechnology.eventuate.VersionedAggregate; +import scala.runtime.BoxedUnit; import java.util.Collections; import java.util.HashMap; @@ -71,43 +72,6 @@ public OrderActor(String orderId, String replicaId, ActorRef eventLog) { this.orderId = orderId; this.replicaId = replicaId; this.order = VersionedAggregate.create(orderId, commandValidation, eventProjection, OrderDomainCmd.instance, OrderDomainEvt.instance); - - setOnCommand(ReceiveBuilder - .match(CreateOrder.class, c -> order.validateCreate(c, processCommand(orderId, sender(), self()))) - .match(OrderCommand.class, c -> order.validateUpdate(c, processCommand(orderId, sender(), self()))) - .match(Resolve.class, c -> order.validateResolve(c.selected(), replicaId, processCommand(orderId, sender(), self()))) - .match(GetState.class, c -> sender().tell(createStateFromAggregate(orderId, order), self())) - .match(SaveSnapshot.class, c -> saveState(sender(), self())) - .build()); - - setOnEvent(ReceiveBuilder - .match(OrderCreated.class, e -> { - order = order.handleCreated(e, lastVectorTimestamp(), lastSequenceNr()); - if (!recovering()) printOrder(order.getVersions()); - }) - .match(OrderEvent.class, e -> { - order = order.handleUpdated(e, lastVectorTimestamp(), lastSequenceNr()); - if (!recovering()) printOrder(order.getVersions()); - }) - .match(Resolved.class, e -> { - order = order.handleResolved(e, lastVectorTimestamp(), lastSequenceNr()); - if (!recovering()) printOrder(order.getVersions()); - }) - .build()); - - setOnSnapshot(ReceiveBuilder - .match(ConcurrentVersionsTree.class, s -> { - order = order.withAggregate(((ConcurrentVersionsTree) s).withProjection(eventProjection)); - System.out.println(String.format("[%s] Snapshot loaded:", orderId)); - printOrder(order.getVersions()); - }) - .build()); - - setOnRecover(ResultHandler - .onSuccess(v -> { - System.out.println(String.format("[%s] Recovery complete:", orderId)); - printOrder(order.getVersions()); - })); } @Override @@ -115,6 +79,17 @@ public Optional getAggregateId() { return Optional.of(orderId); } + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(CreateOrder.class, c -> order.validateCreate(c, processCommand(orderId, getSender(), getSelf()))) + .match(OrderCommand.class, c -> order.validateUpdate(c, processCommand(orderId, getSender(), getSelf()))) + .match(Resolve.class, c -> order.validateResolve(c.selected(), replicaId, processCommand(orderId, getSender(), getSelf()))) + .match(GetState.class, c -> getSender().tell(createStateFromAggregate(orderId, order), getSelf())) + .match(SaveSnapshot.class, c -> saveState(getSender(), getSelf())) + .build(); + } + private ResultHandler processCommand(final String orderId, final ActorRef sender, final ActorRef self) { return ResultHandler.on( evt -> processEvent(evt, sender, self), @@ -122,18 +97,11 @@ private ResultHandler processCommand(final String orderId, final ActorRef ); } - private void processEvent(final E event, final ActorRef sender, final ActorRef self) { - persist(event, ResultHandler.on( - evt -> sender.tell(new CommandSuccess(orderId), self), - err -> sender.tell(new CommandFailure(orderId, err), self) - )); - } - private void saveState(final ActorRef sender, final ActorRef self) { if (order.getAggregate().isPresent()) { save(order.getAggregate().get(), ResultHandler.on( - metadata -> sender.tell(new SaveSnapshotSuccess(orderId, metadata), self), - err -> sender.tell(new SaveSnapshotFailure(orderId, err), self) + metadata -> sender.tell(new SaveSnapshotSuccess(orderId, metadata), self), + err -> sender.tell(new SaveSnapshotFailure(orderId, err), self) )); } else { sender.tell(new SaveSnapshotFailure(orderId, new AggregateDoesNotExistException(orderId)), self); @@ -152,6 +120,51 @@ private Function>, Map>>> ge }; } + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(OrderCreated.class, e -> { + order = order.handleCreated(e, getLastVectorTimestamp(), getLastSequenceNr()); + if (!isRecovering()) printOrder(order.getVersions()); + }) + .match(OrderEvent.class, e -> { + order = order.handleUpdated(e, getLastVectorTimestamp(), getLastSequenceNr()); + if (!isRecovering()) printOrder(order.getVersions()); + }) + .match(Resolved.class, e -> { + order = order.handleResolved(e, getLastVectorTimestamp(), getLastSequenceNr()); + if (!isRecovering()) printOrder(order.getVersions()); + }) + .build(); + } + + private void processEvent(final E event, final ActorRef sender, final ActorRef self) { + persist(event, ResultHandler.on( + evt -> sender.tell(new CommandSuccess(orderId), self), + err -> sender.tell(new CommandFailure(orderId, err), self) + )); + } + + @Override + public AbstractActor.Receive createOnSnapshot() { + return receiveBuilder() + .match(ConcurrentVersionsTree.class, s -> { + order = order.withAggregate(((ConcurrentVersionsTree) s).withProjection(eventProjection)); + System.out.println(String.format("[%s] Snapshot loaded:", orderId)); + printOrder(order.getVersions()); + }) + .build(); + } + + @Override + public ResultHandler createOnRecovery() { + return ResultHandler + .onSuccess(v -> { + System.out.println(String.format("[%s] Recovery complete:", orderId)); + printOrder(order.getVersions()); + }); + } + static void printOrder(List> versions) { if (versions.size() > 1) { System.out.println("Conflict:"); diff --git a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderExample.java b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderExample.java index c99a0cf7..b6d7aa83 100644 --- a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderExample.java +++ b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderExample.java @@ -57,8 +57,11 @@ public OrderExample(ActorRef manager, ActorRef view) { this.view = view; this.reader = new BufferedReader(new InputStreamReader(System.in)); + } - receive(ReceiveBuilder + @Override + public Receive createReceive() { + return receiveBuilder() .match(GetStateSuccess.class, r -> { r.state.values().stream().forEach(OrderActor::printOrder); prompt(); @@ -90,7 +93,7 @@ public OrderExample(ActorRef manager, ActorRef view) { prompt(); }) .match(CommandSuccess.class, r -> prompt()) - .match(String.class, this::process).build()); + .match(String.class, this::process).build(); } private void prompt() throws IOException { diff --git a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderManager.java b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderManager.java index 33d6102e..e28ccbb3 100644 --- a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderManager.java +++ b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderManager.java @@ -16,10 +16,10 @@ package com.rbmhtechnology.example.japi.ordermgnt; +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Props; import akka.dispatch.Futures; -import akka.japi.pf.ReceiveBuilder; import akka.pattern.Patterns; import com.rbmhtechnology.eventuate.AbstractEventsourcedView; import javaslang.collection.HashMap; @@ -49,18 +49,17 @@ public OrderManager(String replicaId, ActorRef eventLog) { super(String.format("j-om-%s", replicaId), eventLog); this.replicaId = replicaId; this.orderActors = HashMap.empty(); + } - setOnCommand(ReceiveBuilder - .match(OrderCommand.class, c -> orderActor(c.orderId).tell(c, sender())) - .match(SaveSnapshot.class, c -> orderActor(c.orderId).tell(c, sender())) - .match(Resolve.class, c -> orderActor(c.id()).tell(c, sender())) - .match(GetState.class, c -> orderActors.isEmpty(), c -> replyStateZero(sender())) - .match(GetState.class, c -> !orderActors.isEmpty(), c -> replyState(sender())) - .build()); - - setOnEvent(ReceiveBuilder - .match(OrderCreated.class, e -> !orderActors.containsKey(e.orderId), e -> orderActor(e.orderId)) - .build()); + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(OrderCommand.class, c -> orderActor(c.orderId).tell(c, getSender())) + .match(SaveSnapshot.class, c -> orderActor(c.orderId).tell(c, getSender())) + .match(Resolve.class, c -> orderActor(c.id()).tell(c, getSender())) + .match(GetState.class, c -> orderActors.isEmpty(), c -> replyStateZero(getSender())) + .match(GetState.class, c -> !orderActors.isEmpty(), c -> replyState(getSender())) + .build(); } private ActorRef orderActor(final String orderId) { @@ -73,7 +72,7 @@ private ActorRef orderActor(final String orderId) { } private void replyStateZero(ActorRef target) { - target.tell(GetStateSuccess.empty(), self()); + target.tell(GetStateSuccess.empty(), getSelf()); } private void replyState(ActorRef target) { @@ -83,9 +82,9 @@ private void replyState(ActorRef target) { .map(func(this::toStateSuccess), dispatcher) .onComplete(proc(result -> { if (result.isSuccess()) { - target.tell(result.get(), self()); + target.tell(result.get(), getSelf()); } else { - target.tell(new GetStateFailure(result.failed().get()), self()); + target.tell(new GetStateFailure(result.failed().get()), getSelf()); } }), dispatcher); } @@ -101,4 +100,11 @@ private Future asyncGetState(final ActorRef actor) { private GetStateSuccess toStateSuccess(final Iterable states) { return StreamSupport.stream(states.spliterator(), false).reduce(GetStateSuccess::merge).get(); } + + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(OrderCreated.class, e -> !orderActors.containsKey(e.orderId), e -> orderActor(e.orderId)) + .build(); + } } diff --git a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderView.java b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderView.java index f5df5d4a..2448fc56 100644 --- a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderView.java +++ b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/ordermgnt/OrderView.java @@ -16,8 +16,8 @@ package com.rbmhtechnology.example.japi.ordermgnt; +import akka.actor.AbstractActor; import akka.actor.ActorRef; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedView; import com.rbmhtechnology.example.japi.ordermgnt.OrderActor.OrderEvent; import javaslang.collection.HashMap; @@ -29,19 +29,25 @@ public class OrderView extends AbstractEventsourcedView { public OrderView(String replicaId, ActorRef eventLog) { super(String.format("j-ov-%s", replicaId), eventLog); this.updateCounts = HashMap.empty(); + } - setOnCommand(ReceiveBuilder - .match(GetUpdateCount.class, this::handleGetUpdateCount) - .build()); - - setOnEvent(ReceiveBuilder - .match(OrderEvent.class, this::handleOrderEvent) - .build()); + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(GetUpdateCount.class, this::handleGetUpdateCount) + .build(); } public void handleGetUpdateCount(final GetUpdateCount cmd) { final String orderId = cmd.orderId; - sender().tell(new GetUpdateCountSuccess(orderId, updateCounts.get(orderId).getOrElse(0)), self()); + sender().tell(new GetUpdateCountSuccess(orderId, updateCounts.get(orderId).getOrElse(0)), getSelf()); + } + + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(OrderEvent.class, this::handleOrderEvent) + .build(); } public void handleOrderEvent(final OrderEvent evt) { diff --git a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/querydb/Emitter.java b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/querydb/Emitter.java index cc35ef3c..24972572 100644 --- a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/querydb/Emitter.java +++ b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/querydb/Emitter.java @@ -16,8 +16,8 @@ package com.rbmhtechnology.example.japi.querydb; +import akka.actor.AbstractActor; import akka.actor.ActorRef; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedActor; import com.rbmhtechnology.eventuate.ResultHandler; @@ -29,28 +29,32 @@ public class Emitter extends AbstractEventsourcedActor { public Emitter(String id, ActorRef eventLog) { super(id, eventLog); + } + + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(CreateCustomer.class, + cmd -> persist(new CustomerCreated(highestCustomerId + 1, cmd.first, cmd.last, cmd.address), ResultHandler.on( + c -> getSender().tell(c, getSelf()), + this::handleFailure + ))) + .match(UpdateAddress.class, cmd -> cmd.cid <= highestCustomerId, + cmd -> persist(new AddressUpdated(cmd.cid, cmd.address), ResultHandler.on( + c -> getSender().tell(c, getSelf()), + this::handleFailure + ))) + .match(UpdateAddress.class, + cmd -> getSender().tell(new Exception(String.format("Customer with %s does not exist", cmd.cid)), getSelf()) + ) + .build(); + } - setOnCommand(ReceiveBuilder - .match(CreateCustomer.class, - cmd -> persist(new CustomerCreated(highestCustomerId + 1, cmd.first, cmd.last, cmd.address), ResultHandler.on( - c -> sender().tell(c, self()), - this::handleFailure - ))) - .match(UpdateAddress.class, cmd -> cmd.cid <= highestCustomerId, - cmd -> persist(new AddressUpdated(cmd.cid, cmd.address), ResultHandler.on( - c -> sender().tell(c, self()), - this::handleFailure - ))) - .match(UpdateAddress.class, - cmd -> sender().tell(new Exception(String.format("Customer with %s does not exist", cmd.cid)), self()) - ) - .build()); - - setOnEvent(ReceiveBuilder - .match(CustomerCreated.class, - evt -> highestCustomerId = evt.cid - ) - .build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(CustomerCreated.class, evt -> highestCustomerId = evt.cid) + .build(); } private void handleFailure(final Throwable failure) { diff --git a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/querydb/Writer.java b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/querydb/Writer.java index be8202ab..8bad3e08 100644 --- a/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/querydb/Writer.java +++ b/eventuate-examples/src/main/java/com/rbmhtechnology/example/japi/querydb/Writer.java @@ -16,8 +16,8 @@ package com.rbmhtechnology.example.japi.querydb; +import akka.actor.AbstractActor; import akka.actor.ActorRef; -import akka.japi.pf.ReceiveBuilder; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; @@ -49,11 +49,14 @@ public Writer(final String id, final ActorRef eventLog, final Session session) { insertCustomerStmt = session.prepare("INSERT INTO CUSTOMER (id, first, last, address) VALUES (?, ?, ?, ?)"); updateCustomerStmt = session.prepare("UPDATE CUSTOMER SET address = ? WHERE id = ?"); updateProgressStmt = session.prepare("UPDATE PROGRESS SET sequence_nr = ? WHERE id = 0"); + } - setOnEvent(ReceiveBuilder - .match(CustomerCreated.class, c -> batch = batch.append(insertCustomerStmt.bind(c.cid, c.first, c.last, c.address))) - .match(AddressUpdated.class, u -> batch = batch.append(updateCustomerStmt.bind(u.address, u.cid))) - .build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(CustomerCreated.class, c -> batch = batch.append(insertCustomerStmt.bind(c.cid, c.first, c.last, c.address))) + .match(AddressUpdated.class, u -> batch = batch.append(updateCustomerStmt.bind(u.address, u.cid))) + .build(); } @Override @@ -63,7 +66,7 @@ public int replayBatchSize() { @Override public CompletionStage onWrite() { - final Long snr = lastSequenceNr(); + final Long snr = getLastSequenceNr(); final CompletableFuture res = sequence(batch.map(session::executeAsync).map(this::toFuture)) .thenCompose(rs -> toFuture(session.executeAsync(updateProgressStmt.bind(snr)))) .thenApply(rs -> null); diff --git a/project/ProjectDependencies.scala b/project/ProjectDependencies.scala index 46004141..12428956 100644 --- a/project/ProjectDependencies.scala +++ b/project/ProjectDependencies.scala @@ -17,7 +17,7 @@ import sbt._ object ProjectDependencyVersions { - val AkkaVersion = "2.4.12" + val AkkaVersion = "2.5.7" val CassandraVersion = "3.4" val Log4jVersion = "2.5" val ProtobufVersion = "2.5.0" diff --git a/src/sphinx/code/userguide/japi/ActorExample.java b/src/sphinx/code/userguide/japi/ActorExample.java index 0d5b5f0b..0e4d1097 100644 --- a/src/sphinx/code/userguide/japi/ActorExample.java +++ b/src/sphinx/code/userguide/japi/ActorExample.java @@ -17,10 +17,10 @@ package userguide.japi; //#event-sourced-actor +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedActor; import com.rbmhtechnology.eventuate.ReplicationConnection; import com.rbmhtechnology.eventuate.ResultHandler; @@ -45,18 +45,6 @@ class ExampleActor extends AbstractEventsourcedActor { public ExampleActor(String id, Optional aggregateId, ActorRef eventLog) { super(id, eventLog); this.aggregateId = aggregateId; - - setOnCommand(ReceiveBuilder - .match(Print.class, cmd -> printState(id, currentState)) - .match(Append.class, cmd -> persist(new Appended(cmd.entry), ResultHandler.on( - evt -> sender().tell(new AppendSuccess(evt.entry), self()), - err -> sender().tell(new AppendFailure(err), self()) - ))) - .build()); - - setOnEvent(ReceiveBuilder - .match(Appended.class, evt -> currentState = append(currentState, evt.entry)) - .build()); } @Override @@ -64,6 +52,24 @@ public Optional getAggregateId() { return aggregateId; } + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(Print.class, cmd -> printState(id(), currentState)) + .match(Append.class, cmd -> persist(new Appended(cmd.entry), ResultHandler.on( + evt -> getSender().tell(new AppendSuccess(evt.entry), getSelf()), + err -> getSender().tell(new AppendFailure(err), getSelf()) + ))) + .build(); + } + + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Appended.class, evt -> currentState = append(currentState, evt.entry)) + .build(); + } + private void printState(String id, Collection currentState) { out.println(String.format("[id = %s, aggregate id = %s] %s", id, getAggregateId().orElseGet(() -> "undefined"), String.join(",", currentState))); diff --git a/src/sphinx/code/userguide/japi/CommunicationExample.java b/src/sphinx/code/userguide/japi/CommunicationExample.java index f5d2b98a..314223d1 100644 --- a/src/sphinx/code/userguide/japi/CommunicationExample.java +++ b/src/sphinx/code/userguide/japi/CommunicationExample.java @@ -17,10 +17,10 @@ package userguide.japi; //#event-driven-communication +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedActor; import com.rbmhtechnology.eventuate.ResultHandler; import static akka.actor.ActorRef.noSender; @@ -32,17 +32,26 @@ public class CommunicationExample { class PingActor extends AbstractEventsourcedActor { + private final ActorRef completion; + public PingActor(String id, ActorRef eventLog, ActorRef completion) { super(id, eventLog); + this.completion = completion; + } - setOnCommand(ReceiveBuilder - .matchEquals("serve", cmd -> persist(new Ping(1), ResultHandler.none())) - .build()); + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .matchEquals("serve", cmd -> persist(new Ping(1), ResultHandler.none())) + .build(); + } - setOnEvent(ReceiveBuilder - .match(Pong.class, evt -> evt.num == 10 && !recovering(), evt -> completion.tell("done", self())) - .match(Pong.class, evt -> persistOnEvent(new Ping(evt.num + 1))) - .build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Pong.class, evt -> evt.num == 10 && !isRecovering(), evt -> completion.tell("done", getSelf())) + .match(Pong.class, evt -> persistOnEvent(new Ping(evt.num + 1))) + .build(); } } @@ -50,10 +59,13 @@ class PongActor extends AbstractEventsourcedActor { public PongActor(String id, ActorRef eventLog) { super(id, eventLog); + } - setOnEvent(ReceiveBuilder - .match(Ping.class, evt -> persistOnEvent(new Pong(evt.num))) - .build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Ping.class, evt -> persistOnEvent(new Pong(evt.num))) + .build(); } } diff --git a/src/sphinx/code/userguide/japi/ConcurrentExample.java b/src/sphinx/code/userguide/japi/ConcurrentExample.java index b4c7ea40..f1a990e1 100644 --- a/src/sphinx/code/userguide/japi/ConcurrentExample.java +++ b/src/sphinx/code/userguide/japi/ConcurrentExample.java @@ -19,8 +19,8 @@ import static userguide.japi.DocUtils.append; //#detecting-concurrent-update +import akka.actor.AbstractActor; import akka.actor.ActorRef; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedActor; import com.rbmhtechnology.eventuate.VectorTime; @@ -30,29 +30,32 @@ public class ConcurrentExample { - //#detecting-concurrent-update - - class ExampleActor extends AbstractEventsourcedActor { - - private Collection currentState = Collections.emptyList(); - private VectorTime updateTimestamp = VectorTime.Zero(); - - public ExampleActor(String id, ActorRef eventLog) { - super(id, eventLog); - - setOnEvent(ReceiveBuilder - .match(Appended.class, evt -> { - if (updateTimestamp.lt(lastVectorTimestamp())) { - // regular update - currentState = append(currentState, evt.entry); - updateTimestamp = lastVectorTimestamp(); - } else if (updateTimestamp.conc(lastVectorTimestamp())) { - // concurrent update - // TODO: track conflicting versions - } - }) - .build()); + //#detecting-concurrent-update + + class ExampleActor extends AbstractEventsourcedActor { + + private Collection currentState = Collections.emptyList(); + private VectorTime updateTimestamp = VectorTime.Zero(); + + public ExampleActor(String id, ActorRef eventLog) { + super(id, eventLog); + } + + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Appended.class, evt -> { + if (updateTimestamp.lt(getLastVectorTimestamp())) { + // regular update + currentState = append(currentState, evt.entry); + updateTimestamp = getLastVectorTimestamp(); + } else if (updateTimestamp.conc(getLastVectorTimestamp())) { + // concurrent update + // TODO: track conflicting versions + } + }) + .build(); + } } - } - //# + //# } diff --git a/src/sphinx/code/userguide/japi/ConditionalExample.java b/src/sphinx/code/userguide/japi/ConditionalExample.java index df000602..a061f354 100644 --- a/src/sphinx/code/userguide/japi/ConditionalExample.java +++ b/src/sphinx/code/userguide/japi/ConditionalExample.java @@ -17,13 +17,14 @@ package userguide.japi; import static userguide.japi.DocUtils.append; + import userguide.japi.ViewExample.GetAppendCountReply; //#conditional-requests +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; -import akka.japi.pf.ReceiveBuilder; import akka.pattern.Patterns; import akka.util.Timeout; import com.rbmhtechnology.eventuate.*; @@ -53,23 +54,29 @@ class ExampleActor extends AbstractEventsourcedActor { public ExampleActor(String id, ActorRef eventLog) { super(id, eventLog); this.id = id; - - setOnCommand(ReceiveBuilder - .match(Append.class, cmd -> persist(new Appended(cmd.entry), ResultHandler.onSuccess( - evt -> sender().tell(new AppendSuccess(evt.entry, lastVectorTimestamp()), self()) - ))) - // ... - .build()); - - setOnEvent(ReceiveBuilder - .match(Appended.class, evt -> currentState = append(currentState, evt.entry)) - .build()); } @Override public Optional getAggregateId() { return Optional.of(id); } + + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(Append.class, cmd -> persist(new Appended(cmd.entry), ResultHandler.onSuccess( + evt -> getSender().tell(new AppendSuccess(evt.entry, getLastVectorTimestamp()), getSelf()) + ))) + // ... + .build(); + } + + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Appended.class, evt -> currentState = append(currentState, evt.entry)) + .build(); + } } // Command diff --git a/src/sphinx/code/userguide/japi/ResolveExample.java b/src/sphinx/code/userguide/japi/ResolveExample.java index 6f925aff..b1d2e8dc 100644 --- a/src/sphinx/code/userguide/japi/ResolveExample.java +++ b/src/sphinx/code/userguide/japi/ResolveExample.java @@ -18,8 +18,8 @@ //#automated-conflict-resolution +import akka.actor.AbstractActor; import akka.actor.ActorRef; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.*; import java.util.Collection; @@ -43,25 +43,28 @@ class ExampleActor extends AbstractEventsourcedActor { public ExampleActor(String id, ActorRef eventLog) { super(id, eventLog); + } - setOnEvent(ReceiveBuilder - .match(Appended.class, evt -> { - versionedState = versionedState.update(evt.entry, lastVectorTimestamp(), lastSystemTimestamp(), lastEmitterId()); - - if (versionedState.conflict()) { - final Stream>> conflictingVersions = versionedState.getAll().stream() - .sorted((v1, v2) -> { - if (v1.systemTimestamp() == v2.systemTimestamp()) { - return v1.creator().compareTo(v2.creator()); - } - return v1.systemTimestamp() > v2.systemTimestamp() ? -1 : 1; - }); - - final VectorTime winnerTimestamp = conflictingVersions.findFirst().get().vectorTimestamp(); - versionedState = versionedState.resolve(winnerTimestamp); - } - }) - .build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Appended.class, evt -> { + versionedState = versionedState.update(evt.entry, getLastVectorTimestamp(), getLastSystemTimestamp(), getLastEmitterId()); + + if (versionedState.conflict()) { + final Stream>> conflictingVersions = versionedState.getAll().stream() + .sorted((v1, v2) -> { + if (v1.systemTimestamp() == v2.systemTimestamp()) { + return v1.creator().compareTo(v2.creator()); + } + return v1.systemTimestamp() > v2.systemTimestamp() ? -1 : 1; + }); + + final VectorTime winnerTimestamp = conflictingVersions.findFirst().get().vectorTimestamp(); + versionedState = versionedState.resolve(winnerTimestamp); + } + }) + .build(); } } //# @@ -78,28 +81,34 @@ class ExampleActor extends AbstractEventsourcedActor { public ExampleActor(String id, ActorRef eventLog) { super(id, eventLog); + } + + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(Append.class, cmd -> versionedState.conflict(), + cmd -> getSender().tell(new AppendRejected(cmd.entry, versionedState.getAll()), getSelf()) + ) + .match(Append.class, cmd -> { + // .... + }) + .match(Resolve.class, cmd -> persist(new Resolved(cmd.selectedTimestamp), ResultHandler.on( + evt -> { /* reply to sender omitted ... */ }, + err -> { /* reply to sender omitted ... */ } + ))) + .build(); + } - setOnCommand(ReceiveBuilder - .match(Append.class, cmd -> versionedState.conflict(), - cmd -> sender().tell(new AppendRejected(cmd.entry, versionedState.getAll()), self()) - ) - .match(Append.class, cmd -> { - // .... - }) - .match(Resolve.class, cmd -> persist(new Resolved(cmd.selectedTimestamp), ResultHandler.on( - evt -> { /* reply to sender omitted ... */ }, - err -> { /* reply to sender omitted ... */ } - ))) - .build()); - - setOnEvent(ReceiveBuilder - .match(Appended.class, evt -> - versionedState = versionedState.update(evt.entry, lastVectorTimestamp(), lastSystemTimestamp(), lastEmitterId()) - ) - .match(Resolved.class, evt -> - versionedState = versionedState.resolve(evt.selectedTimestamp, lastVectorTimestamp(), lastSystemTimestamp()) - ) - .build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Appended.class, evt -> + versionedState = versionedState.update(evt.entry, getLastVectorTimestamp(), getLastSystemTimestamp(), getLastEmitterId()) + ) + .match(Resolved.class, evt -> + versionedState = versionedState.resolve(evt.selectedTimestamp, getLastVectorTimestamp(), getLastSystemTimestamp()) + ) + .build(); } } diff --git a/src/sphinx/code/userguide/japi/TrackingExample.java b/src/sphinx/code/userguide/japi/TrackingExample.java index a14356b2..ba72140d 100644 --- a/src/sphinx/code/userguide/japi/TrackingExample.java +++ b/src/sphinx/code/userguide/japi/TrackingExample.java @@ -20,8 +20,8 @@ //#tracking-conflicting-versions +import akka.actor.AbstractActor; import akka.actor.ActorRef; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.*; import java.util.Collection; @@ -39,21 +39,24 @@ class ExampleActor extends AbstractEventsourcedActor { public ExampleActor(String id, ActorRef eventLog) { super(id, eventLog); - - setOnEvent(ReceiveBuilder - .match(Appended.class, evt -> { - versionedState = versionedState.update(evt.entry, lastVectorTimestamp(), lastSystemTimestamp(), lastEmitterId()); - - if (versionedState.conflict()) { - final Collection>> all = versionedState.getAll(); - // TODO: resolve conflicting versions - } else { - final Collection currentState = versionedState.getAll().get(0).value(); - // ... - } - }) - .build()); } + + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Appended.class, evt -> { + versionedState = versionedState.update(evt.entry, getLastVectorTimestamp(), getLastSystemTimestamp(), getLastEmitterId()); + + if (versionedState.conflict()) { + final Collection>> all = versionedState.getAll(); + // TODO: resolve conflicting versions + } else { + final Collection currentState = versionedState.getAll().get(0).value(); + // ... + } + }) + .build(); + } } //# } diff --git a/src/sphinx/code/userguide/japi/ViewExample.java b/src/sphinx/code/userguide/japi/ViewExample.java index 6fd78533..968ac63a 100644 --- a/src/sphinx/code/userguide/japi/ViewExample.java +++ b/src/sphinx/code/userguide/japi/ViewExample.java @@ -18,8 +18,8 @@ //#event-sourced-view +import akka.actor.AbstractActor; import akka.actor.ActorRef; -import akka.japi.pf.ReceiveBuilder; import com.rbmhtechnology.eventuate.AbstractEventsourcedView; import com.rbmhtechnology.eventuate.VectorTime; //# @@ -35,16 +35,22 @@ class ExampleView extends AbstractEventsourcedView { public ExampleView(String id, ActorRef eventLog) { super(id, eventLog); + } - setOnCommand(ReceiveBuilder - .match(GetAppendCount.class, cmd -> sender().tell(new GetAppendCountReply(appendCount), self())) - .match(GetResolveCount.class, cmd -> sender().tell(new GetResolveCountReply(resolveCount), self())) - .build()); + @Override + public AbstractActor.Receive createOnCommand() { + return receiveBuilder() + .match(GetAppendCount.class, cmd -> sender().tell(new GetAppendCountReply(appendCount), getSelf())) + .match(GetResolveCount.class, cmd -> sender().tell(new GetResolveCountReply(resolveCount), getSelf())) + .build(); + } - setOnEvent(ReceiveBuilder - .match(Appended.class, evt -> appendCount += 1) - .match(Resolved.class, evt -> resolveCount += 1) - .build()); + @Override + public AbstractActor.Receive createOnEvent() { + return receiveBuilder() + .match(Appended.class, evt -> appendCount += 1) + .match(Resolved.class, evt -> resolveCount += 1) + .build(); } }