Skip to content

Rework domain event handling #79

Open
@marein

Description

@marein

Add more type safety for subscribers within a context. A StoredEvent should carry its DomainEvent which can be read without the json or array<string, mixed> baggage.

This refactoring should be divided into smaller steps. Unless the changes are big, each can be done horizontally for all contexts:

  • Consider making DomainEvent a generic of DomainEvent<T of object>.
  • Add time based GapDetection. Especially useful for reprocessing the full stream. E.g. there we know that there's no transaction pending when the next event is older than, e.g., 5 minutes.
  • Add DoctrineTransientEventStore which deletes the events during streaming after they've been committed. Useful when it's only used as an outbox and no replay is needed. Although already possible with the CleanableEventStore, with this solution there's no need to keep track of the pointer.
  • EventStorePointer and StoredEvent should work with a resumeToken or pointer of type string instead of an id of type int. In abstract terms, that's exactly what it is. This means that these types can be used with a wider range of technologies, such as MongoDB's Change Streams or Redis' Streams. With that, a StreamableEventStore::streamFrom(string $token, int $batchSize): Stream interface could be used. A StoredEvent wouldn't be needed in that design and PollableEventStore can be removed.
  • Final look and renaming types, e.g. FollowEventStoreDispatcher could be named after the use case PollEventStore or DispatchEvents. Also files could be namespaced, e.g. everything related to EventStorePointer.
  • Tail the transaction log instead of polling the event store table in at least one context (with row replication).
    Note: This will add complexity to the project. A comment on the interface PollableEventStore which other options exist should be enough. Although polling adds a bit of latency, it scales good enough.
  • Serialize / Deserialize the whole event into an event column and prepare StoredEvent to carry the DomainEvent. StoredEvent carries DomainEvent #83
  • Prepare each StoredEventSubscriber to read from DomainEvent instead of StoredEvent.
    Use DomainEvent in subscribers #84
  • Remove DomainEvent::occurredOn() and push this information to StoredEvent::occurredOn(). This removes the global clock dependency. If the time is important to the domain, it should be modeled explicitly, e.g. how it is done in the chat context MessageWritten:writtenAt.
    Move occurredOn to StoredEvent #85
  • Remove all other occurrences of DomainEvent::payload() etc., e.g. in tests.
    Move occurredOn to StoredEvent #85
    Remove deprecations in event handling #86
  • Revisit (to potentially remove) database fields from event store.
    Remove deprecations in event handling #86
  • Remove StoredEventSubscriber::isSubscribedTo() and DomainEventSubscriber::isSubscribedTo()? Looks like these are unnecessary function calls. As part of the process of this issue, they already return always true. If a subscriber doesn't handle something, it can do an early return (at least as it's currently implemented). Additionally, remove all still existing deprecated functions which got produced by this issue.
    Remove deprecations in event handling #86
  • Decouple StoredEventSubscribers from StoredEvent. StoredEvent is an internal structure.
    Decouple subscribers from StoredEvent #229
  • Add a local sequence number for DomainEvents. With that change, the EventStore can be used with an EventSourced model and consumers from messaging technologies with competing consumers on a logical unit can use this to handle out of order messaging. Additionally, together with the aggregate id, this can be used for idempotency in consumers that cannot be natural idempotent.
    Add DomainEvent version #230
  • Add ability to add custom headers to a DomainEvent. This can be used for auditing reasons, to track the recording time (would make occurredOn on StoredEvent unnecessary) or to apply a unique event id (for idempotency in consumers that cannot be natural idempotent).
    Decouple subscribers from StoredEvent #229 (sneaked in the removal of occurredOn)
    Add DomainEvent version #230 (sneaked in)
  • Decouple domain code from DomainEvent. They should decide themselves if they want to add the dependency or not.
    Add DomainEvent version #230
  • EventStore::append() should accept multiple events. This allows for combined insert statements, avoiding roundtrips to the database.
    Speed up group inserts to EventStore #228
  • Segregate EventStore interface. It has two responsibilities. First, it's used to append or fetch data based on an aggregate. Second, it's used to poll the event store to feed stored event subscribers. Not every implementation needs that, as other implementations might tail the transaction log.
    Segregate EventStore interface #140
    Fix duplicate method call #145
    • EventStore should only depend on DomainEvent. After the interface segregation, there's no need for EventStore to depend on StoredEvent.
      EventStore depends only on DomainEvent #180
    • Make PollableEventStore::since() a constant stream (via iterator)? -> No, only with workarounds such as yielding a value that indicates that nothing could be fetched in order to move the event store pointer. Sounds not straight forward to only gain a few ms.
  • Use publisher confirms to publish events to RabbitMQ.
    Use publisher confirms in connect four #103
    Use publisher confirms in chat #135
    Use publisher confirms in identity #136
    Fix documentation #160
  • The latency of the polling mechanism can be reduced by a good amount of milliseconds. The process that is pushing fetched events to workers could send a commit after each batch without awaiting the acknowledgment. It only awaits the oldest commit after a certain set of piled up commits. The performance is good enough (feels real-time in the browser), and it scales with the number of shards (depending on the subscriber code 10k-20k events/s per shard). As this change would make the processes even more complicated to understand, it will not be made for the time being.
  • Parallelize stored event subscribers.
    Parallelize event store follower #98
    Parallelize event store follower #99
  • Create an example to poll from a read replica. -> Not needed. If it would ever be a bottleneck to poll the database (e.g. because of too many read models, where each is built in its own process), then there's the transaction log as well as streaming technologies.
  • Graceful shutdown for event store followers, see Parallelize event store follower #99 (comment).
    Shut down event store dispatcher gracefully #113
  • Make events deletable via an additional interface CleanableEventStore or DeletableEventStore, and a use case that is cleaning up all events until the lowest value of a combination of given EventStorePointers. Not all contexts need a storage of all events at all times, e.g. the chat context.
    Provide CleanableEventStore #227
  • Fix the doctrine migration. The field id should be an unsigned bigint.
    Provide CleanableEventStore #227 (sneaked in)

As always, changes can be breaking as this is an „always green field“-app. So, ./project build is required. Otherwise it needs to be divided into even more steps including database migrations and alike.

Metadata

Metadata

Assignees

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions