diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java index 194a2ad52b..0a3b9863e9 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModuleAgent.java @@ -2505,7 +2505,8 @@ private int checkNodeControlToggle() serviceCount, ctx.leaderArchiveControlChannel(), ctx.archiveContext().controlRequestStreamId(), - ctx.replicationChannel()); + ctx.replicationChannel(), + ctx.fileSyncLevel()); } NodeControl.ToggleState.reset(nodeControlToggle); @@ -3760,7 +3761,8 @@ private void replicateStandbySnapshotsForStartup() serviceCount, ctx.leaderArchiveControlChannel(), ctx.archiveContext().controlRequestStreamId(), - ctx.replicationChannel())) + ctx.replicationChannel(), + ctx.fileSyncLevel())) { while (!standbySnapshotReplicator.isComplete()) { @@ -3794,7 +3796,8 @@ private int pollStandbySnapshotReplication(final long nowNs) if (standbySnapshotReplicator.isComplete()) { - ctx.snapshotCounter().increment(); + recoveryPlan = recordingLog.createRecoveryPlan(archive, ctx.serviceCount(), Aeron.NULL_VALUE); + ctx.snapshotCounter().incrementOrdered(); CloseHelper.quietClose(standbySnapshotReplicator); standbySnapshotReplicator = null; } diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/StandbySnapshotReplicator.java b/aeron-cluster/src/main/java/io/aeron/cluster/StandbySnapshotReplicator.java index eaaf244a09..5ba74215d2 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/StandbySnapshotReplicator.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/StandbySnapshotReplicator.java @@ -40,6 +40,7 @@ class StandbySnapshotReplicator implements AutoCloseable private final String archiveControlChannel; private final int archiveControlStreamId; private final String replicationChannel; + private final int fileSyncLevel; private final Object2ObjectHashMap errorsByEndpoint = new Object2ObjectHashMap<>(); private MultipleRecordingReplication recordingReplication; private ArrayList snapshotsToReplicate; @@ -53,7 +54,8 @@ class StandbySnapshotReplicator implements AutoCloseable final int serviceCount, final String archiveControlChannel, final int archiveControlStreamId, - final String replicationChannel) + final String replicationChannel, + final int fileSyncLevel) { this.memberId = memberId; this.archive = archive; @@ -62,6 +64,7 @@ class StandbySnapshotReplicator implements AutoCloseable this.archiveControlChannel = archiveControlChannel; this.archiveControlStreamId = archiveControlStreamId; this.replicationChannel = replicationChannel; + this.fileSyncLevel = fileSyncLevel; } static StandbySnapshotReplicator newInstance( @@ -71,7 +74,8 @@ static StandbySnapshotReplicator newInstance( final int serviceCount, final String archiveControlChannel, final int archiveControlStreamId, - final String replicationChannel) + final String replicationChannel, + final int fileSyncLevel) { final AeronArchive archive = AeronArchive.connect(archiveCtx.clone().errorHandler(null)); final StandbySnapshotReplicator standbySnapshotReplicator = new StandbySnapshotReplicator( @@ -81,7 +85,8 @@ static StandbySnapshotReplicator newInstance( serviceCount, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); archive.context().recordingSignalConsumer(standbySnapshotReplicator::onSignal); return standbySnapshotReplicator; } @@ -163,7 +168,7 @@ int poll(final long nowNs) entry.timestamp, entry.serviceId); } - recordingLog.force(0); + recordingLog.force(fileSyncLevel); CloseHelper.quietClose(recordingReplication); recordingReplication = null; diff --git a/aeron-cluster/src/test/java/io/aeron/cluster/StandbySnapshotReplicatorTest.java b/aeron-cluster/src/test/java/io/aeron/cluster/StandbySnapshotReplicatorTest.java index 8182fbf46d..b2327b573c 100644 --- a/aeron-cluster/src/test/java/io/aeron/cluster/StandbySnapshotReplicatorTest.java +++ b/aeron-cluster/src/test/java/io/aeron/cluster/StandbySnapshotReplicatorTest.java @@ -42,11 +42,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; class StandbySnapshotReplicatorTest { @@ -57,6 +53,7 @@ class StandbySnapshotReplicatorTest private final String replicationChannel = "aeron:udp?endpoint=host0:0"; private final AeronArchive.Context ctx = new AeronArchive.Context(); private final int memberId = 12; + private final int fileSyncLevel = 1; private final AeronArchive mockArchive = mock(AeronArchive.class); private final MultipleRecordingReplication mockMultipleRecordingReplication0 = mock( @@ -88,7 +85,7 @@ void shouldReplicateStandbySnapshots() when(mockMultipleRecordingReplication0.completedDstRecordingId(anyLong())).thenAnswer( (invocation) -> dstRecordingIds.get(invocation.getArgument(0))); - try (RecordingLog recordingLog = new RecordingLog(clusterDir, true)) + try (RecordingLog recordingLog = spy(new RecordingLog(clusterDir, true))) { recordingLog.appendSnapshot(1, 0, 0, logPositionOldest, 1_000_000_000L, SERVICE_ID); recordingLog.appendSnapshot(2, 0, 0, logPositionOldest, 1_000_000_000L, 0); @@ -118,7 +115,8 @@ void shouldReplicateStandbySnapshots() serviceCount, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); when(mockMultipleRecordingReplication0.isComplete()).thenReturn(true); @@ -137,6 +135,7 @@ void shouldReplicateStandbySnapshots() verify(mockMultipleRecordingReplication0).addRecording(1L, NULL_RECORDING_ID, NULL_POSITION); verify(mockMultipleRecordingReplication0).addRecording(2L, NULL_RECORDING_ID, NULL_POSITION); verify(mockMultipleRecordingReplication0).poll(nowNs); + verify(recordingLog).force(fileSyncLevel); } } @@ -178,7 +177,8 @@ void shouldPassSignalsToRecordingReplication() 1, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); standbySnapshotReplicator.poll(0); verify(mockArchive).pollForRecordingSignals(); @@ -209,7 +209,8 @@ void shouldHandleNoStandbySnapshots() 1, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); standbySnapshotReplicator.poll(0); assertTrue(standbySnapshotReplicator.isComplete()); @@ -261,7 +262,8 @@ void shouldSwitchEndpointsOnMultipleReplicationException() 1, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); when(mockMultipleRecordingReplication0.poll(anyLong())).thenThrow(new ClusterException("fail")); when(mockMultipleRecordingReplication1.isComplete()).thenReturn(true); @@ -322,7 +324,8 @@ void shouldSwitchEndpointsOnArchivePollForSignalsException() 1, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); when(mockMultipleRecordingReplication1.isComplete()).thenReturn(true); @@ -384,7 +387,8 @@ void shouldThrowExceptionIfUnableToReplicateAnySnapshotsDueToClusterExceptions() 1, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); standbySnapshotReplicator.poll(nowNs); standbySnapshotReplicator.poll(nowNs); @@ -441,7 +445,8 @@ void shouldThrowExceptionIfUnableToReplicateAnySnapshotsDueToArchiveExceptions() 1, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); standbySnapshotReplicator.poll(nowNs); standbySnapshotReplicator.poll(nowNs); @@ -486,7 +491,8 @@ void shouldNotRetrieveSnapshotsIfRecordingLogAlreadyHasUpToDateCopies() 1, archiveControlChannel, archiveControlStreamId, - replicationChannel); + replicationChannel, + fileSyncLevel); standbySnapshotReplicator.poll(nowNs); standbySnapshotReplicator.poll(nowNs);