Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Upgrade to Akka 2.5.7 (#401)
Browse files Browse the repository at this point in the history
Akka 2.5.7 introduced the following incompatible changes in the Java-API
- ReceiveBuilder:
  - explicit create step required
  - builds `AbstractActor.Receive` instead of `Actor.Receive`
- Match.match: Changes in type parameters

To adjust to these changes the Java-API of Eventuate is streamlined
and mirrors the new Akka Java-API by removing all `setOn...`-methods
used to defined the actor's behavior and replaces these methods with
`createOn...`-variants which can be used to define custom behavior
by returning the behavior definition wrapped in an instance of
`AbstractActor.Receive`.

New Java-specific accessor methods are introduced to access the
properties of any event-sourced component with the Java-API.

`AbstractEventsourcedProcessor` supports behavior definition by overriding
the `createOnProcess` method which facilitates a Java-specific `Process` type
to define custom behavior.
The `ProcessBuilder` may be used to create instances of type `Process`.

A Java-specific `BehaviorContext` is added to the Java-API of al all
event-sourced components which can be retrieved by calling `getContext()`.
  • Loading branch information
Christoph Stumpf authored Nov 28, 2017
1 parent 4d6624a commit e41743c
Show file tree
Hide file tree
Showing 23 changed files with 709 additions and 441 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,66 +17,115 @@
package com.rbmhtechnology.eventuate;

import akka.japi.pf.FI;
import akka.japi.pf.Match;
import akka.japi.pf.PFBuilder;

/**
* Java API for building a PartialFunction that matches arbitrary [[Object]]s to {@link Iterable}s.
* Java API for building a {@link AbstractEventsourcedProcessor.Process} behavior that matches arbitrary [[Object]]s to {@link Iterable}s.
*
* Can be used to define processing behaviour in {@link AbstractEventsourcedProcessor#setOnProcessEvent}
* or {@link AbstractEventsourcedProcessor#onProcessEvent}.
* Used to define processing behaviour in {@link AbstractEventsourcedProcessor#createOnProcessEvent}.
*/
public final class ProcessBuilder {
private ProcessBuilder() {

private final PFBuilder<Object, Iterable<Object>> underlying;

private ProcessBuilder(PFBuilder<Object, Iterable<Object>> underlying) {
this.underlying = underlying;
}

/**
* Returns a new {@link ProcessBuilder} instance.
*
* @return a {@link ProcessBuilder}
*/
public static ProcessBuilder create() {
return new ProcessBuilder(new PFBuilder<>());
}

/**
* Returns a new {@link PFBuilder} of {@link java.lang.Iterable} with a case statement added.
* Add a new case statement to this builder.
*
* @param type the type to match the argument against
* @param apply the function to apply for the given argument - must return {@link java.lang.Iterable}
* @param apply the function to apply for the given argument - must return {@link Iterable}
* @param <P> the type of the argument
* @return a builder with the case statement added
* @return the {@link ProcessBuilder} with the case statement added
*/
public static <P> PFBuilder<Object, Iterable<Object>> match(final Class<? extends P> type, final IterableApply<? extends P> apply) {
return Match.match(type, apply);
public <P> ProcessBuilder match(final Class<P> type, IterableApply<P> apply) {
underlying.match(type, apply);
return this;
}

/**
* Returns a new {@link PFBuilder} of {@link java.lang.Iterable} with a case statement added.
* Add a new case statement to this builder.
*
* @param type the type to match the argument against
* @param predicate the predicate to match the argument against
* @param apply the function to apply for the given argument - must return {@link java.lang.Iterable}
* @param apply the function to apply for the given argument - must return {@link Iterable}
* @param <P> the type of the argument
* @return a builder with the case statement added
* @return the {@link ProcessBuilder} with the case statement added
*/
public <P> ProcessBuilder match(final Class<P> type, final FI.TypedPredicate<P> predicate, final IterableApply<P> apply) {
underlying.match(type, predicate, apply);
return this;
}

/**
* Add a new case statement to this builder without compile time type check of the parameters.
* Should normally not be used, but when matching on class with generic type.
*
* @param type the type to match the argument against
* @param apply the function to apply for the given argument - must return {@link Iterable}
* @return the {@link ProcessBuilder} with the case statement added
*/
public ProcessBuilder matchUnchecked(final Class<?> type, IterableApply<?> apply) {
underlying.matchUnchecked(type, apply);
return this;
}

/**
* Add a new case statement to this builder without compile time type check of the parameters.
* Should normally not be used, but when matching on class with generic type.
*
* @param type the type to match the argument against
* @param predicate a predicate that will be evaluated on the argument if the type matches
* @param apply the function to apply for the given argument - must return {@link Iterable}
* @return the {@link ProcessBuilder} with the case statement added
*/
public static <P> PFBuilder<Object, Iterable<Object>> match(final Class<? extends P> type,
final FI.TypedPredicate<? extends P> predicate,
final IterableApply<? extends P> apply) {
return Match.match(type, predicate, apply);
public ProcessBuilder matchUnchecked(final Class<?> type, final FI.TypedPredicate<?> predicate, final IterableApply<?> apply) {
underlying.matchUnchecked(type, predicate, apply);
return this;
}

/**
* Returns a new {@link PFBuilder} of {@link java.lang.Iterable} with a case statement added.
* Add a new case statement to this builder.
*
* @param object the object to match the argument against
* @param apply the function to apply for the given argument - must return an {@link java.lang.Iterable}
* @param apply the function to apply for the given argument - must return an {@link Iterable}
* @param <P> the type of the argument
* @return a builder with the case statement added
* @return the {@link ProcessBuilder} with the case statement added
*/
public <P> ProcessBuilder matchEquals(final P object, final IterableApply<P> apply) {
underlying.matchEquals(object, apply);
return this;
}

/**
* Add a new case statement to this builder, that matches any argument.
*
* @param apply the function to apply for the given argument - must return an {@link Iterable}
* @return the {@link ProcessBuilder} with the case statement added
*/
public static <P> PFBuilder<Object, Iterable<Object>> matchEquals(final P object, final IterableApply<P> apply) {
return Match.matchEquals(object, apply);
public ProcessBuilder matchAny(final IterableApply<Object> apply) {
underlying.matchAny(apply);
return this;
}

/**
* Returns a new {@link PFBuilder} of {@link java.lang.Iterable} with a default case statement added.
* Builds the resulting processing behavior as an instance of {@link AbstractEventsourcedProcessor.Process}.
*
* @param apply the function to apply for the given argument - must return an {@link java.lang.Iterable}
* @return a builder with the case statement added
* @return the configured {@link AbstractEventsourcedProcessor.Process}
*/
public static PFBuilder<Object, Iterable<Object>> matchAny(final IterableApply<Object> apply) {
return Match.matchAny(apply);
public AbstractEventsourcedProcessor.Process build() {
return new AbstractEventsourcedProcessor.Process(underlying.build());
}

public interface IterableApply<T> extends FI.Apply<T, Iterable<Object>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.{ Optional => JOption }

import akka.actor.ActorRef

import scala.collection.immutable._
import scala.collection.JavaConverters._
import scala.compat.java8.OptionConverters._

Expand All @@ -32,47 +33,25 @@ private[eventuate] trait EventsourcedProcessorHandlers {
private[eventuate] trait EventSourcedProcessorAdapter extends EventsourcedProcessorHandlers with EventsourcedWriterFailureHandlerAdapter {
this: EventsourcedProcessor =>

type JProcess = PartialFunction[Any, JIterable[Any]]

object JProcess {
object emptyBehavior extends JProcess {
def isDefinedAt(x: Any) = false
def apply(x: Any) = throw new UnsupportedOperationException("Empty process behavior apply()")
}
}

private var processBehaviour: Option[JProcess] = None

abstract override final def processEvent: Process =
onProcessEvent.andThen(seq => seq.asScala.to[collection.immutable.Seq])

/**
* Java API of [[EventsourcedProcessor.processEvent event-processing]] handler.
*
* Returns a partial function that defines the actor's event processing behaviour.
* Returns a [[AbstractEventsourcedProcessor.Process]] that defines the actor's event processing behaviour.
* Use [[ProcessBuilder]] to define the behaviour.
*
* Takes precedence over [[setOnProcessEvent]].
*
* @see [[EventsourcedProcessor]]
*/
def onProcessEvent: JProcess = processBehaviour.getOrElse(JProcess.emptyBehavior)
def createOnProcessEvent(): AbstractEventsourcedProcessor.Process =
AbstractEventsourcedProcessor.emptyProcessBehavior

abstract override final def processEvent: Process =
createOnProcessEvent().asScala

/**
* Java API that sets this actor's [[EventsourcedProcessor.processEvent event-processing]] handler.
*
* Supplied with a partial function that defines the actor's event processing behaviour.
* Use [[ProcessBuilder]] to define the behaviour.
*
* If [[onProcessEvent]] is implemented, the supplied behaviour is ignored.
*
* @param handler This actor's event processing handler.
* @see [[EventsourcedProcessor]]
* creates a new empty [[ProcessBuilder]]
*/
def setOnProcessEvent(handler: JProcess): Unit =
if (processBehaviour.isEmpty) processBehaviour = Some(handler)
else throw new IllegalStateException("Actor process behaviour has already been set with setOnProcessEvent(...). " +
"The behaviour can only be set once.")
final def processBuilder(): ProcessBuilder =
ProcessBuilder.create()
}

private[eventuate] trait EventsourcedProcessorWriteSuccessHandlerAdapter extends EventsourcedWriterSuccessHandlers[Long, Long] {
Expand All @@ -90,13 +69,28 @@ private[eventuate] trait EventsourcedProcessorWriteSuccessHandlerAdapter extends
super.writeSuccess(result)
}

/**
* Java API
*/
object AbstractEventsourcedProcessor {

final class Process(val behaviour: PartialFunction[Any, JIterable[Any]])

final val emptyProcessBehavior: Process = new Process(PartialFunction.empty)

implicit class ProcessConverter(p: Process) {
def asScala: EventsourcedProcessor.Process =
p.behaviour.andThen(seq => seq.asScala.to[Seq])
}
}

/**
* Java API for actors that implement [[EventsourcedProcessor]].
*
* @see [[AbstractEventsourcedView]] for a detailed usage of the Java API
* @see [[EventsourcedProcessor]]
*/
class AbstractEventsourcedProcessor(id: String, eventLog: ActorRef, val targetEventLog: ActorRef) extends AbstractEventsourcedView(id, eventLog)
class AbstractEventsourcedProcessor(val id: String, val eventLog: ActorRef, val targetEventLog: ActorRef) extends AbstractEventsourcedComponent
with EventsourcedProcessor with EventSourcedProcessorAdapter with EventsourcedProcessorWriteSuccessHandlerAdapter {

override final def readSuccess(result: Long): Option[Long] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ import akka.actor.ActorRef
* @see [[AbstractEventsourcedView]] for a detailed usage of the Java API
* @see [[StatefulProcessor]]
*/
class AbstractEventsourcedStatefulProcessor(id: String, eventLog: ActorRef, val targetEventLog: ActorRef) extends AbstractEventsourcedView(id, eventLog)
class AbstractEventsourcedStatefulProcessor(val id: String, val eventLog: ActorRef, val targetEventLog: ActorRef) extends AbstractEventsourcedComponent
with StatefulProcessor with EventSourcedProcessorAdapter with EventsourcedProcessorWriteSuccessHandlerAdapter {
}
Loading

0 comments on commit e41743c

Please sign in to comment.