Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generate snapshot in background #364

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

### [Unreleased]
### Added
- Save aggregate snapshots asynchronously in the background when we have a large amount of event on a single stream. Default it 50000. This is configurable via JNDI var snapshot.background.saving.threshold

## [17.100.3] - 2024-11-14
### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.UUID;

import org.apache.commons.lang3.SerializationException;
Expand All @@ -18,25 +19,29 @@

public class AggregateSnapshot<T extends Aggregate> implements Serializable {

private static final long serialVersionUID = -6621681121087097921L;

private final UUID streamId;
private final Long positionInStream;
private final String type;
private final byte[] aggregateByteRepresentation;
private final ZonedDateTime createdAt;

@SuppressWarnings("unchecked")
public AggregateSnapshot(final UUID streamId, final Long versionId, final T aggregate) {
this(streamId, versionId, (Class<T>) aggregate.getClass(), serialize(aggregate));
public AggregateSnapshot(final UUID streamId, final Long versionId, final T aggregate, final ZonedDateTime createdAt) {
this(streamId, versionId, (Class<T>) aggregate.getClass(), serialize(aggregate), createdAt);
}

public AggregateSnapshot(final UUID streamId, final Long versionId, final Class<T> type, final byte[] aggregateByteRepresentation) {
this(streamId, versionId, type.getName(), aggregateByteRepresentation);
public AggregateSnapshot(final UUID streamId, final Long versionId, final Class<T> type, final byte[] aggregateByteRepresentation, final ZonedDateTime createdAt) {
this(streamId, versionId, type.getName(), aggregateByteRepresentation, createdAt);
}

public AggregateSnapshot(final UUID streamId, final Long versionId, final String type, final byte[] aggregateByteRepresentation) {
public AggregateSnapshot(final UUID streamId, final Long versionId, final String type, final byte[] aggregateByteRepresentation, final ZonedDateTime createdAt) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the aggregateByteRepresentation byte array and createdAt date are linked, should they be but into a new single Object, AggregateByteArray or something. Also, if it's not always been created, should that Object be Optional?

this.streamId = streamId;
this.positionInStream = versionId;
this.type = type;
this.aggregateByteRepresentation = aggregateByteRepresentation;
this.createdAt = createdAt;
}

public UUID getStreamId() {
Expand All @@ -55,12 +60,16 @@ public byte[] getAggregateByteRepresentation() {
return aggregateByteRepresentation;
}

public ZonedDateTime getCreatedAt() {
return createdAt;
}

@SuppressWarnings("unchecked")
public T getAggregate(final ObjectInputStreamStrategy streamStrategy) throws AggregateChangeDetectedException {
try (final ObjectInputStream objectInputStream = streamStrategy.objectInputStreamOf(new ByteArrayInputStream(aggregateByteRepresentation))) {
return (T) Class.forName(getType()).cast(objectInputStream.readObject());
} catch (SerializationException | ClassNotFoundException | IOException e) {
throw new AggregateChangeDetectedException(format("Failed to deserialise Aggregate into %s. Cause: %s", type, e.getLocalizedMessage()));
throw new AggregateChangeDetectedException(format("Failed to deserialise Aggregate into %s. Cause: %s", type, e.getLocalizedMessage()), positionInStream, createdAt);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
package uk.gov.justice.services.core.aggregate.exception;

import java.time.ZonedDateTime;

public class AggregateChangeDetectedException extends Exception {

private static final long serialVersionUID = 5934757852541650746L;

public AggregateChangeDetectedException(final String message) {
private long positionInStream;
private ZonedDateTime createdAt;

public AggregateChangeDetectedException(final String message, final long positionInStream, final ZonedDateTime createdAt) {
super(message);
this.positionInStream = positionInStream;
this.createdAt = createdAt;
}

public long getPositionInStream() {
return positionInStream;
}

public ZonedDateTime getCreatedAt() {
return createdAt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class AggregateSnapshotTest {
public void shouldCreateAnAggregateSnapshot() throws Exception {
final TestAggregate aggregate = new TestAggregate("STATE1");

final AggregateSnapshot<TestAggregate> snapshot = new AggregateSnapshot<>(STREAM_ID, VERSION_ID, aggregate);
final AggregateSnapshot<TestAggregate> snapshot = new AggregateSnapshot<>(STREAM_ID, VERSION_ID, aggregate, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like passing in null like this. I've found nulls usually lead to tears and pain. Can this be an Optional instead?


assertThat(snapshot.getStreamId(), is(STREAM_ID));
assertThat(snapshot.getPositionInStream(), is(VERSION_ID));
Expand All @@ -38,7 +38,7 @@ public void shouldCreateAnAggregateSnapshot() throws Exception {
public void shouldGetAnAggregateSnapshot() throws Exception {
final TestAggregate aggregate = new TestAggregate("STATE1");

final AggregateSnapshot<TestAggregate> snapshot = new AggregateSnapshot<>(STREAM_ID, VERSION_ID, aggregate);
final AggregateSnapshot<TestAggregate> snapshot = new AggregateSnapshot<>(STREAM_ID, VERSION_ID, aggregate, null);

assertThat(snapshot.getStreamId(), is(STREAM_ID));
assertThat(snapshot.getPositionInStream(), is(VERSION_ID));
Expand All @@ -55,7 +55,8 @@ public void shouldThrowAAggregateChangeDetectedExceptionIfTheAggregateCannotBeDe
STREAM_ID,
VERSION_ID,
TestAggregate.class,
aggregate);
aggregate,
null);

try {
aggregateSnapshot.getAggregate(streamStrategy);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.gov.justice.services.eventsourcing.jdbc.snapshot;

import static java.lang.String.format;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.fromSqlTimestamp;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;

import uk.gov.justice.domain.aggregate.Aggregate;
Expand All @@ -13,6 +14,8 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;

Expand All @@ -33,12 +36,16 @@ public class SnapshotJdbcRepository implements SnapshotRepository {
private static final String COL_VERSION_ID = "version_id";
private static final String COL_TYPE = "type";
private static final String COL_AGGREGATE = "aggregate";
private static final String COL_CREATED_AT = "created_at";
private static final String SQL_FIND_LATEST_BY_STREAM_ID = "SELECT * FROM snapshot WHERE stream_id=? AND type=? ORDER BY version_id DESC";
private static final String SQL_INSERT_EVENT_LOG = "INSERT INTO snapshot (stream_id, version_id, type, aggregate, created_at ) VALUES(?, ?, ?, ?, ?)";
private static final String SQL_UPSERT_SNAPSHOT = "INSERT INTO snapshot AS s (stream_id, version_id, type, aggregate, created_at ) VALUES(?, ?, ?, ?, ?) ON CONFLICT ON CONSTRAINT stream_id_version_id_type DO UPDATE SET aggregate =?, created_at = ? WHERE s.created_at<=?";
private static final String DELETE_ALL_SNAPSHOTS_FOR_STREAM_ID_AND_CLASS = "delete from snapshot where stream_id =? and type=?";
private static final String DELETE_ALL_SNAPSHOTS_OF_STREAM_ID_AND_CLASS_AND_LESS_THAN_POSITION_IN_STREAM = "delete from snapshot where stream_id =? and type=? and version_id<?";
private static final String SQL_CURRENT_SNAPSHOT_VERSION_ID = "SELECT version_id FROM snapshot WHERE stream_id=? AND type=? ORDER BY version_id DESC";

// using 'stream_id =? and type=? and created_at <=?' may give better outcome and is backward compatible with the current behavious
private static final String DELETE_SNAPSHOT = "delete from snapshot where stream_id =? and type=? and version_id =? and created_at=? ";

@Inject
private EventStoreDataSourceProvider eventStoreDataSourceProvider;

Expand All @@ -52,12 +59,18 @@ public class SnapshotJdbcRepository implements SnapshotRepository {
public boolean storeSnapshot(final AggregateSnapshot aggregateSnapshot) {

try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement ps = connection.prepareStatement(SQL_INSERT_EVENT_LOG)) {
final PreparedStatement ps = connection.prepareStatement(SQL_UPSERT_SNAPSHOT)) {
final Timestamp sqlTimestamp = toSqlTimestamp(clock.now());

ps.setObject(1, aggregateSnapshot.getStreamId());
ps.setLong(2, aggregateSnapshot.getPositionInStream());
ps.setString(3, aggregateSnapshot.getType());
ps.setBytes(4, aggregateSnapshot.getAggregateByteRepresentation());
ps.setTimestamp(5, toSqlTimestamp(clock.now()));
ps.setTimestamp(5, sqlTimestamp);
ps.setBytes(6, aggregateSnapshot.getAggregateByteRepresentation());
ps.setTimestamp(7, sqlTimestamp);
ps.setTimestamp(8, sqlTimestamp);

ps.executeUpdate();

return true;
Expand Down Expand Up @@ -97,6 +110,21 @@ public <T extends Aggregate> void removeAllSnapshots(final UUID streamId, final
}
}

@Override
public <T extends Aggregate> int removeSnapshot(final UUID streamId, final Class<T> clazz, final long positionInStream, final ZonedDateTime createdAt) {
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement ps = connection.prepareStatement(DELETE_SNAPSHOT)) {
ps.setObject(1, streamId);
ps.setString(2, clazz.getName());
ps.setLong(3, positionInStream);
ps.setTimestamp(4, toSqlTimestamp(createdAt));
return ps.executeUpdate();
} catch (final SQLException e) {
logger.error("Exception while removing snapshots %s of stream %s".formatted(clazz, streamId), e);
}
return 0;
}

@Override
public <T extends Aggregate> void removeAllSnapshotsOlderThan(final AggregateSnapshot aggregateSnapshot) {
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
Expand Down Expand Up @@ -134,7 +162,8 @@ public AggregateSnapshot entityFrom(final ResultSet resultSet) throws SQLExcepti
(UUID) resultSet.getObject(COL_STREAM_ID),
resultSet.getLong(COL_VERSION_ID),
resultSet.getString(COL_TYPE),
resultSet.getBytes(COL_AGGREGATE));
resultSet.getBytes(COL_AGGREGATE),
fromSqlTimestamp(resultSet.getTimestamp(COL_CREATED_AT)));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uk.gov.justice.domain.aggregate.Aggregate;
import uk.gov.justice.domain.snapshot.AggregateSnapshot;

import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;

Expand Down Expand Up @@ -41,6 +42,17 @@ public interface SnapshotRepository {
<T extends Aggregate> void removeAllSnapshots(final UUID streamId, final Class<T> clazz);


/**
* Remove all snapshots older than a given date.
*
* @param <T> the type parameter
* @param streamId the stream id
* @param clazz the clazz
* @param createdAt
*/
<T extends Aggregate> int removeSnapshot(final UUID streamId, final Class<T> clazz, final long positionInStream, final ZonedDateTime createdAt);


/**
* Remove all snapshots older than given aggregateSnapshot
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static uk.gov.justice.services.common.converter.ZonedDateTimes.toSqlTimestamp;

Expand Down Expand Up @@ -97,16 +98,29 @@ public void shouldStoreAndRetrieveSnapshot() throws Exception {
}

@Test
public void shouldIgnoreFailureOnStoreAndLogError() throws Exception {
public void shouldUpsert() throws Exception {
when(clock.now()).thenReturn(now);
final UUID streamId = randomUUID();
final AggregateSnapshot aggregateSnapshot = createSnapshot(streamId, VERSION_ID, TYPE, AGGREGATE);
snapshotJdbcRepository.storeSnapshot(aggregateSnapshot);

final boolean snapshotStored = snapshotJdbcRepository.storeSnapshot(aggregateSnapshot);

assertThat(snapshotStored, is(true));
}

@Test
public void shouldIgnoreFailureOnStoreAndLogError() throws Exception {
final SQLException exception = new SQLException("Failed to get connection");
final DataSource ds = mock(DataSource.class);
when(eventStoreDataSourceProvider.getDefaultDataSource()).thenReturn(ds);
when(ds.getConnection()).thenThrow(exception);
final UUID streamId = randomUUID();
final AggregateSnapshot aggregateSnapshot = createSnapshot(streamId, VERSION_ID, TYPE, AGGREGATE);

final boolean snapshotStored = snapshotJdbcRepository.storeSnapshot(aggregateSnapshot);
assertFalse(snapshotStored);
verify(logger).error(eq("Error while storing a snapshot for {} at version {}"), eq(streamId), eq( VERSION_ID), ArgumentMatchers.any(Throwable.class));
verify(logger).error(eq("Error while storing a snapshot for {} at version {}"), eq(streamId), eq(VERSION_ID), ArgumentMatchers.any(Throwable.class));
}

@Test
Expand Down Expand Up @@ -204,6 +218,44 @@ public void shouldRemoveOlderSnapshotsThanGivenSnapshot() throws Exception {
assertThat(fetchedSnapshots, hasItems(snapshot3, snapshot4, snapshot5));
}

@Test
public void shouldRemoveSnapshot() throws Exception {
when(clock.now()).thenReturn(now);
final UUID streamId = randomUUID();

final AggregateSnapshot snapshot1 = createSnapshot(streamId, 1L, TYPE, AGGREGATE);
final AggregateSnapshot snapshot2 = createSnapshot(streamId, 2L, TYPE, AGGREGATE);
final AggregateSnapshot snapshot3 = createSnapshot(streamId, 3L, TYPE, AGGREGATE);

snapshotJdbcRepository.storeSnapshot(snapshot1);
snapshotJdbcRepository.storeSnapshot(snapshot2);
snapshotJdbcRepository.storeSnapshot(snapshot3);

final int count = snapshotJdbcRepository.removeSnapshot(snapshot2.getStreamId(), TYPE, snapshot2.getPositionInStream(), now);
assertThat(count, is(1));
final List<AggregateSnapshot> fetchedSnapshots = fetchAllSnapshotsFromDb();
assertThat(fetchedSnapshots.size(), is(2));
assertThat(fetchedSnapshots, hasItems(snapshot1, snapshot3));
verifyNoMoreInteractions(logger);
}

@Test
public void removeSnapshotShouldReturnZeroOnSQLException() throws Exception {
final SQLException exception = new SQLException("Failed to get connection");
final DataSource ds = mock(DataSource.class);
when(eventStoreDataSourceProvider.getDefaultDataSource()).thenReturn(ds);
when(ds.getConnection()).thenThrow(exception);

final UUID streamId = randomUUID();

final AggregateSnapshot snapshot1 = createSnapshot(streamId, 1L, TYPE, AGGREGATE);

final int count = snapshotJdbcRepository.removeSnapshot(snapshot1.getStreamId(), TYPE, snapshot1.getPositionInStream(), now);
assertThat(count, is(0));
verify(logger).error("Exception while removing snapshots %s of stream %s".formatted(TYPE, streamId), exception);
verifyNoMoreInteractions(logger);
}

@Test
public void shouldLogErrorAndIgnoreAnyFailuresWhileRemovingOldSnapshots() throws Exception {
//There is no easy way to reproduce sqlexception and had to use chain of mocks
Expand Down Expand Up @@ -243,7 +295,7 @@ public void shouldRetrieveOptionalNullIfOnlySnapshotsOfDifferentTypesAvailable()

}

private void removeAllSnapshots() throws Exception {
private void removeAllSnapshots() throws Exception {
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(REMOVE_ALL_SNAPSHOTS_SQL)) {
preparedStatement.executeUpdate();
Expand All @@ -255,7 +307,7 @@ private Optional<Timestamp> findSnapshotCreatedAt(UUID streamId, Long versionId)
final PreparedStatement ps = connection.prepareStatement(FIND_CREATED_TIME_BY_VERSION_ID)) {
ps.setObject(1, streamId);
ps.setLong(2, versionId);
try(final ResultSet rs = ps.executeQuery()) {
try (final ResultSet rs = ps.executeQuery()) {
return rs.next() ? Optional.of(rs.getTimestamp("created_at")) : Optional.empty();
}
}
Expand All @@ -265,8 +317,8 @@ private List<AggregateSnapshot> fetchAllSnapshotsFromDb() throws SQLException {
final List<AggregateSnapshot> fetchedSnapshots = new ArrayList<>();
try (final Connection connection = eventStoreDataSourceProvider.getDefaultDataSource().getConnection();
final PreparedStatement preparedStatement = connection.prepareStatement(FETCH_ALL_SNAPSHOTS_QUERY)) {
try(final ResultSet rs = preparedStatement.executeQuery()) {
while(rs.next()) {
try (final ResultSet rs = preparedStatement.executeQuery()) {
while (rs.next()) {
fetchedSnapshots.add(snapshotJdbcRepository.entityFrom(rs));
}
}
Expand All @@ -277,7 +329,7 @@ private List<AggregateSnapshot> fetchAllSnapshotsFromDb() throws SQLException {

@SuppressWarnings("unchecked")
private <T extends Aggregate> AggregateSnapshot createSnapshot(final UUID streamId, final Long sequenceId, Class<T> type, byte[] aggregate) {
return new AggregateSnapshot(streamId, sequenceId, type, aggregate);
return new AggregateSnapshot(streamId, sequenceId, type, aggregate, null);
}

public class RecordingAggregate implements Aggregate {
Expand Down
2 changes: 1 addition & 1 deletion aggregate-snapshot/aggregate-snapshot-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
</dependency>
<dependency>
<groupId>uk.gov.justice.services</groupId>
<artifactId>aggregate</artifactId>
<artifactId>core</artifactId>
</dependency>
<dependency>
<groupId>uk.gov.justice.event-store</groupId>
Expand Down
Loading