Skip to content

Commit e4a5119

Browse files
committed
expose create cookie delegate
1 parent ad81aa4 commit e4a5119

File tree

5 files changed

+59
-37
lines changed

5 files changed

+59
-37
lines changed

libs/cluster/Server/Replication/ReplicationLogCheckpointManager.cs

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,55 @@
1111

1212
namespace Garnet.cluster
1313
{
14-
internal sealed class ReplicationLogCheckpointManager(
15-
INamedDeviceFactoryCreator deviceFactoryCreator,
16-
ICheckpointNamingScheme checkpointNamingScheme,
17-
bool isMainStore,
18-
bool removeOutdated = false,
19-
int fastCommitThrottleFreq = 0,
20-
ILogger logger = null) : DeviceLogCommitCheckpointManager(deviceFactoryCreator, checkpointNamingScheme, removeOutdated: false, fastCommitThrottleFreq, logger), IDisposable
14+
internal sealed class ReplicationLogCheckpointManager : DeviceLogCommitCheckpointManager, IDisposable
2115
{
2216
public long CurrentSafeAofAddress = 0;
2317
public long RecoveredSafeAofAddress = 0;
2418

2519
public string CurrentReplicationId = string.Empty;
2620
public string RecoveredReplicationId = string.Empty;
2721

28-
readonly bool isMainStore = isMainStore;
22+
readonly bool isMainStore;
2923
public Action<bool, long, long, bool> checkpointVersionShiftStart;
3024
public Action<bool, long, long, bool> checkpointVersionShiftEnd;
3125

32-
readonly bool safelyRemoveOutdated = removeOutdated;
26+
readonly bool safelyRemoveOutdated;
3327

34-
readonly ILogger logger = logger;
28+
readonly ILogger logger;
29+
30+
public ReplicationLogCheckpointManager(
31+
INamedDeviceFactoryCreator deviceFactoryCreator,
32+
ICheckpointNamingScheme checkpointNamingScheme,
33+
bool isMainStore,
34+
bool safelyRemoveOutdated = false,
35+
int fastCommitThrottleFreq = 0,
36+
ILogger logger = null)
37+
: base(deviceFactoryCreator, checkpointNamingScheme, removeOutdated: false, fastCommitThrottleFreq, logger)
38+
{
39+
this.isMainStore = isMainStore;
40+
this.safelyRemoveOutdated = safelyRemoveOutdated;
41+
this.createCookieDelegate = CreateCookie;
42+
this.logger = logger;
43+
44+
// Pre-append cookie in commitMetadata.
45+
// cookieMetadata 52 bytes
46+
// 1. 4 bytes to track size of cookie
47+
// 2. 8 bytes for checkpointCoveredAddress
48+
// 3. 40 bytes for primaryReplicationId
49+
unsafe byte[] CreateCookie()
50+
{
51+
var cookie = new byte[sizeof(int) + sizeof(long) + CurrentReplicationId.Length];
52+
var primaryReplIdBytes = Encoding.ASCII.GetBytes(CurrentReplicationId);
53+
fixed (byte* ptr = cookie)
54+
fixed (byte* pridPtr = primaryReplIdBytes)
55+
{
56+
*(int*)ptr = sizeof(long) + CurrentReplicationId.Length;
57+
*(long*)(ptr + 4) = CurrentSafeAofAddress;
58+
Buffer.MemoryCopy(pridPtr, ptr + 12, primaryReplIdBytes.Length, primaryReplIdBytes.Length);
59+
}
60+
return cookie;
61+
}
62+
}
3563

3664
public override void CheckpointVersionShiftStart(long oldVersion, long newVersion, bool isStreaming)
3765
=> checkpointVersionShiftStart?.Invoke(isMainStore, oldVersion, newVersion, isStreaming);
@@ -63,31 +91,6 @@ public IDevice GetDevice(CheckpointFileType retStateType, Guid fileToken)
6391

6492
#region ICheckpointManager
6593

66-
/// <summary>
67-
/// Pre-append cookie in commitMetadata.
68-
/// cookieMetadata 52 bytes
69-
/// 1. 4 bytes to track size of cookie
70-
/// 2. 8 bytes for checkpointCoveredAddress
71-
/// 3. 40 bytes for primaryReplicationId
72-
/// </summary>
73-
/// <returns></returns>
74-
private unsafe byte[] CreateCookie()
75-
{
76-
var cookie = new byte[sizeof(int) + sizeof(long) + CurrentReplicationId.Length];
77-
var primaryReplIdBytes = Encoding.ASCII.GetBytes(CurrentReplicationId);
78-
fixed (byte* ptr = cookie)
79-
fixed (byte* pridPtr = primaryReplIdBytes)
80-
{
81-
*(int*)ptr = sizeof(long) + CurrentReplicationId.Length;
82-
*(long*)(ptr + 4) = CurrentSafeAofAddress;
83-
Buffer.MemoryCopy(pridPtr, ptr + 12, primaryReplIdBytes.Length, primaryReplIdBytes.Length);
84-
}
85-
return cookie;
86-
}
87-
88-
/// <inheritdoc />
89-
public override byte[] GetCookie() => CreateCookie();
90-
9194
private HybridLogRecoveryInfo ConverMetadata(byte[] checkpointMetadata)
9295
{
9396
var success = true;

libs/storage/Tsavorite/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
3131
protected readonly ICheckpointNamingScheme checkpointNamingScheme;
3232
private readonly SemaphoreSlim semaphore;
3333

34+
protected Func<byte[]> createCookieDelegate;
35+
3436
private readonly bool removeOutdated;
3537
private SectorAlignedBufferPool bufferPool;
3638

@@ -173,7 +175,10 @@ public byte[] GetCommitMetadata(long commitNum)
173175
#region ICheckpointManager
174176

175177
/// <inheritdoc />
176-
public virtual byte[] GetCookie() => null;
178+
public void AddCookieDelegate(Func<byte[]> createCookieAction) => this.createCookieDelegate = createCookieAction;
179+
180+
/// <inheritdoc />
181+
public byte[] GetCookie() => createCookieDelegate == null ? null : createCookieDelegate();
177182

178183
/// <inheritdoc />
179184
public unsafe void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)

libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/TsavoriteStateMachineProperties.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4+
using System;
5+
46
namespace Tsavorite.core
57
{
68
public partial class TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> : TsavoriteBase
@@ -9,6 +11,12 @@ public partial class TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> : Ts
911
{
1012
internal long lastVersion;
1113

14+
/// <summary>
15+
/// Add cookie delegate
16+
/// </summary>
17+
/// <param name="cookieCreationDelegate"></param>
18+
public void AddCookieDelegate(Func<byte[]> cookieCreationDelegate) => checkpointManager.AddCookieDelegate(cookieCreationDelegate);
19+
1220
private byte[] recoveredCommitCookie;
1321
/// <summary>
1422
/// User-specified commit cookie persisted with last recovered commit

libs/storage/Tsavorite/cs/src/core/Index/Recovery/ICheckpointManager.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ namespace Tsavorite.core
3030
/// </summary>
3131
public interface ICheckpointManager : IDisposable
3232
{
33+
/// <summary>
34+
/// Add create cookie action
35+
/// </summary>
36+
/// <param name="createCookieAction"></param>
37+
void AddCookieDelegate(Func<byte[]> createCookieAction);
38+
3339
/// <summary>
3440
/// Get current cookie
3541
/// </summary>

libs/storage/Tsavorite/cs/test/SimpleRecoveryTest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ private async ValueTask SimpleRecoveryTest1_Worker(CheckpointType checkpointType
149149
}
150150

151151
if (testCommitCookie)
152-
store1.AddCookie(commitCookie);
152+
store1.AddCookieDelegate(() => commitCookie);
153153
_ = store1.TryInitiateFullCheckpoint(out Guid token, checkpointType);
154154
if (completionSyncMode == CompletionSyncMode.Sync)
155155
store1.CompleteCheckpointAsync().AsTask().GetAwaiter().GetResult();

0 commit comments

Comments
 (0)