Skip to content

Commit 52eba4d

Browse files
vazoisCopilot
andauthored
Skip Value Expiration Check When Scanning a Tombstoned Record (#1612)
* fix migrate write test * prevent expiration check on tombstoned key while scanning * fix formatting * ensure reviv pause signal is observed through epoch protection * make revivPauseEvent readonly * Update test/Garnet.test.cluster/ClusterMigrateTests.cs Co-authored-by: Copilot <[email protected]> * addressing first comments * update comment in MigrateScanFunctions * release epoch when acquiring exlucisve SuspendConfigMerge lock * add ReaderWriterLock custom implementation * fix formatting * fixing simple tests * make pause reviv thread safe --------- Co-authored-by: Copilot <[email protected]>
1 parent 638b55b commit 52eba4d

File tree

9 files changed

+452
-78
lines changed

9 files changed

+452
-78
lines changed

libs/cluster/Server/ClusterManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public ClusterManager(ClusterProvider clusterProvider, ILogger logger = null)
105105
gossipDelay = TimeSpan.FromSeconds(serverOptions.GossipDelay);
106106
clusterTimeout = serverOptions.ClusterTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(serverOptions.ClusterTimeout);
107107
numActiveTasks = 0;
108+
activeMergeLock = new();
108109
GossipSamplePercent = serverOptions.GossipSamplePercent;
109110

110111
// Run Background task
@@ -141,7 +142,7 @@ async Task FlushTask()
141142
public void Dispose()
142143
{
143144
DisposeBackgroundTasks();
144-
145+
activeMergeLock?.Dispose();
145146
clusterConfigDevice.Dispose();
146147
pool.Free();
147148
epoch?.Dispose();

libs/cluster/Server/Gossip/Gossip.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ internal sealed partial class ClusterManager : IDisposable
1919
public readonly TimeSpan gossipDelay;
2020
public readonly TimeSpan clusterTimeout;
2121
private volatile int numActiveTasks = 0;
22-
private SingleWriterMultiReaderLock activeMergeLock;
22+
private readonly common.ReaderWriterLock activeMergeLock;
2323
public readonly GarnetClusterConnectionStore clusterConnectionStore;
2424

2525
public GossipStats gossipStats;

libs/cluster/Server/Migration/MigrateScanFunctions.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public unsafe bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMeta
3333
mss.ThrowIfCancelled();
3434

3535
// Do not send key if it is expired
36-
if (ClusterSession.Expired(ref value))
36+
// NOTE: Because the scan executes includingTombstones, tombstone records may not have valid expiration metadata; skip expiration validation here and defer it until the actual send occurs (MigrateSessionCommonUtils.cs:WriteOrSendMainStoreKeyValuePair).
37+
if (!recordMetadata.RecordInfo.Tombstone && ClusterSession.Expired(ref value))
3738
return true;
3839

3940
// TODO: Some other way to detect namespaces
@@ -99,7 +100,8 @@ public unsafe bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordM
99100
mss.ThrowIfCancelled();
100101

101102
// Do not send key if it is expired
102-
if (ClusterSession.Expired(ref value))
103+
// NOTE: Because the scan executes includingTombstones, tombstone records may not have valid expiration metadata; skip expiration validation here and defer it until the actual send occurs (MigrateSessionCommonUtils.cs:WriteOrSendObjectStoreKeyValuePair).
104+
if (!recordMetadata.RecordInfo.Tombstone && ClusterSession.Expired(ref value))
103105
return true;
104106

105107
var s = HashSlotUtils.HashSlot(key);

libs/cluster/Server/Migration/MigrationDriver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private async Task BeginAsyncMigrationTask()
5454
var configResumed = true;
5555
try
5656
{
57-
clusterProvider.storeWrapper.store.PauseRevivification();
57+
clusterProvider.storeWrapper.store.PauseRevivification(_timeout, _cts.Token);
5858

5959
// Set target node to import state
6060
if (!TrySetSlotRanges(GetSourceNodeId, MigrateState.IMPORT))

libs/cluster/Session/RespClusterBasicCommands.cs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4+
using System;
45
using System.Diagnostics;
56
using System.Text;
67
using Garnet.common;
@@ -75,17 +76,26 @@ private bool NetworkClusterForget(out bool invalidParameters)
7576
}
7677

7778
logger?.LogTrace("CLUSTER FORGET {nodeid} {seconds}", nodeId, expirySeconds);
78-
if (!clusterProvider.clusterManager.TryRemoveWorker(nodeId, expirySeconds, out var errorMessage))
79+
// NOTE: Release epoch to prevent deadlock when acquiring SuspendConfigMerge exclusive lock within TryRemoveWorker.
80+
ReleaseCurrentEpoch();
81+
try
7982
{
80-
while (!RespWriteUtils.TryWriteError(errorMessage, ref dcurr, dend))
81-
SendAndReset();
83+
if (!clusterProvider.clusterManager.TryRemoveWorker(nodeId, expirySeconds, out var errorMessage))
84+
{
85+
while (!RespWriteUtils.TryWriteError(errorMessage, ref dcurr, dend))
86+
SendAndReset();
87+
}
88+
else
89+
{
90+
// Terminate any outstanding migration tasks
91+
_ = clusterProvider.migrationManager.TryRemoveMigrationTask(nodeId);
92+
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
93+
SendAndReset();
94+
}
8295
}
83-
else
96+
finally
8497
{
85-
// Terminate any outstanding migration tasks
86-
_ = clusterProvider.migrationManager.TryRemoveMigrationTask(nodeId);
87-
while (!RespWriteUtils.TryWriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
88-
SendAndReset();
98+
AcquireCurrentEpoch();
8999
}
90100

91101
return true;
@@ -379,7 +389,16 @@ private bool NetworkClusterGossip(out bool invalidParameters)
379389
// GossipWithMeet messages are only send through a call to CLUSTER MEET at the remote node
380390
if (gossipWithMeet || current.IsKnown(other.LocalNodeId))
381391
{
382-
_ = clusterProvider.clusterManager.TryMerge(other);
392+
// NOTE: release the epoch to avoid deadlock with MIGRATE config suspension
393+
ReleaseCurrentEpoch();
394+
try
395+
{
396+
_ = clusterProvider.clusterManager.TryMerge(other);
397+
}
398+
finally
399+
{
400+
AcquireCurrentEpoch();
401+
}
383402

384403
// Remember that this connection is being used for another cluster node to talk to us
385404
Debug.Assert(RemoteNodeId is null || RemoteNodeId == other.LocalNodeId, "Node Id shouldn't change once set for a connection");
@@ -448,7 +467,17 @@ private bool NetworkClusterReset(out bool invalidParameters)
448467
}
449468
}
450469

451-
var resp = clusterProvider.clusterManager.TryReset(soft, expirySeconds);
470+
// NOTE: Release epoch to prevent deadlock when acquiring SuspendConfigMerge exclusive lock within TryReset.
471+
ReleaseCurrentEpoch();
472+
ReadOnlySpan<byte> resp = default;
473+
try
474+
{
475+
resp = clusterProvider.clusterManager.TryReset(soft, expirySeconds);
476+
}
477+
finally
478+
{
479+
AcquireCurrentEpoch();
480+
}
452481
if (!soft) clusterProvider.FlushDB(true);
453482

454483
while (!RespWriteUtils.TryWriteDirect(resp, ref dcurr, dend))

libs/common/ReaderWriterLock.cs

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
using System;
5+
using System.Threading;
6+
7+
namespace Garnet.common
8+
{
9+
/// <summary>
10+
/// Reader/writer lock backed by semaphores.
11+
/// Supports a single writer or multiple concurrent readers.
12+
/// </summary>
13+
public sealed class ReaderWriterLock : IDisposable
14+
{
15+
enum LockOperation
16+
{
17+
Reader,
18+
Writer
19+
}
20+
21+
#if NET9_0_OR_GREATER
22+
readonly Lock mutex;
23+
#else
24+
readonly object mutex;
25+
#endif
26+
readonly SemaphoreSlim readerSemaphore;
27+
readonly SemaphoreSlim writerSemaphore;
28+
29+
// -1: writer holds the lock, 0: free, >0: number of active readers.
30+
int lockHeld;
31+
int waitingReaders;
32+
int waitingWriters;
33+
int grantedReaders;
34+
int grantedWriters;
35+
bool disposed;
36+
37+
public ReaderWriterLock()
38+
{
39+
mutex = new();
40+
readerSemaphore = new(0);
41+
writerSemaphore = new(0);
42+
}
43+
44+
public void Dispose()
45+
{
46+
lock (mutex)
47+
{
48+
if (disposed)
49+
return;
50+
disposed = true;
51+
}
52+
53+
readerSemaphore.Dispose();
54+
writerSemaphore.Dispose();
55+
}
56+
57+
/// <summary>
58+
/// Acquires an exclusive write lock, granting sole access to the resource for write operations.
59+
/// </summary>
60+
public void WriteLock()
61+
=> WriteLock(default);
62+
63+
/// <summary>
64+
/// Acquires an exclusive write lock, granting sole access to the resource for write operations.
65+
/// </summary>
66+
/// <param name="token">A cancellation token that can be used to cancel the lock acquisition process.</param>
67+
public void WriteLock(CancellationToken token)
68+
=> Acquire(LockOperation.Writer, token);
69+
70+
/// <summary>
71+
/// Release writer lock and wake either one writer or all waiting readers.
72+
/// </summary>
73+
public void WriteUnlock()
74+
=> Release();
75+
76+
/// <summary>
77+
/// Acquires a reader lock, allowing concurrent read access to the resource.
78+
/// </summary>
79+
public void ReadLock()
80+
=> ReaderLock(default);
81+
82+
/// <summary>
83+
/// Acquires a reader lock, allowing concurrent read access to the resource.
84+
/// </summary>
85+
/// <param name="token">The cancellation token used to signal the operation's cancellation.</param>
86+
public void ReaderLock(CancellationToken token)
87+
=> Acquire(LockOperation.Reader, token);
88+
89+
/// <summary>
90+
/// Release reader lock and wake one writer when this was the last active reader.
91+
/// </summary>
92+
public void ReadUnlock()
93+
=> Release();
94+
95+
void Acquire(LockOperation operation, CancellationToken cancellationToken)
96+
{
97+
SemaphoreSlim waitSemaphore;
98+
lock (mutex)
99+
{
100+
ObjectDisposedException.ThrowIf(disposed, nameof(ReaderWriterLock));
101+
102+
switch (operation)
103+
{
104+
case LockOperation.Reader:
105+
// Acquire reader lock immediately
106+
if (lockHeld >= 0 && waitingWriters == 0)
107+
{
108+
lockHeld++;
109+
return;
110+
}
111+
112+
// Prepare to wait because lock was not available for readers
113+
waitingReaders++;
114+
waitSemaphore = readerSemaphore;
115+
break;
116+
case LockOperation.Writer:
117+
// Acquire writer lock immediately
118+
if (lockHeld == 0)
119+
{
120+
lockHeld = -1;
121+
return;
122+
}
123+
124+
// Prepare to wait because lock was not available for writers
125+
waitingWriters++;
126+
waitSemaphore = writerSemaphore;
127+
break;
128+
default:
129+
throw new InvalidOperationException();
130+
}
131+
}
132+
133+
WaitForGrant(waitSemaphore, operation, cancellationToken);
134+
}
135+
136+
void WaitForGrant(SemaphoreSlim waitSemaphore, LockOperation operation, CancellationToken cancellationToken)
137+
{
138+
try
139+
{
140+
waitSemaphore.Wait(cancellationToken);
141+
}
142+
catch (OperationCanceledException)
143+
{
144+
var shouldRelease = false;
145+
lock (mutex)
146+
{
147+
ObjectDisposedException.ThrowIf(disposed, nameof(ReaderWriterLock));
148+
149+
switch (operation)
150+
{
151+
case LockOperation.Reader:
152+
if (grantedReaders > 0)
153+
{
154+
// lock was granted before cancellation
155+
grantedReaders--;
156+
shouldRelease = true;
157+
}
158+
else
159+
{
160+
// lock was not granted yet
161+
waitingReaders--;
162+
}
163+
break;
164+
case LockOperation.Writer:
165+
if (grantedWriters > 0)
166+
{
167+
// lock was granted before cancellation
168+
grantedWriters--;
169+
shouldRelease = true;
170+
}
171+
else
172+
{
173+
// lock was not granted yet
174+
waitingWriters--;
175+
}
176+
break;
177+
default:
178+
throw new InvalidOperationException();
179+
}
180+
}
181+
182+
if (shouldRelease)
183+
Release();
184+
185+
throw;
186+
}
187+
188+
lock (mutex)
189+
{
190+
ObjectDisposedException.ThrowIf(disposed, nameof(ReaderWriterLock));
191+
192+
switch (operation)
193+
{
194+
case LockOperation.Reader:
195+
if (grantedReaders <= 0)
196+
throw new SynchronizationLockException("Reader wakeup observed without matching grant.");
197+
// Release granted counters and not waiting counters since they have been decremented at release that granted the lock.
198+
grantedReaders--;
199+
break;
200+
case LockOperation.Writer:
201+
if (grantedWriters <= 0)
202+
throw new SynchronizationLockException("Writer wakeup observed without matching grant.");
203+
// Release granted counters and not waiting counters since they have been decremented at release that granted the lock.
204+
grantedWriters--;
205+
break;
206+
default:
207+
throw new InvalidOperationException();
208+
}
209+
}
210+
}
211+
212+
void Release()
213+
{
214+
lock (mutex)
215+
{
216+
ObjectDisposedException.ThrowIf(disposed, nameof(ReaderWriterLock));
217+
218+
if (lockHeld == 0)
219+
throw new SynchronizationLockException("Unlock called when lock is not held.");
220+
221+
// Update lock state first.
222+
if (lockHeld == -1)
223+
lockHeld = 0;
224+
else
225+
lockHeld--;
226+
227+
// Still held by active readers.
228+
if (lockHeld != 0)
229+
return;
230+
231+
// Writers have priority.
232+
if (waitingWriters > 0)
233+
{
234+
waitingWriters--;
235+
lockHeld = -1;
236+
grantedWriters++;
237+
_ = writerSemaphore.Release();
238+
return;
239+
}
240+
241+
// Consume readers next
242+
if (waitingReaders > 0)
243+
{
244+
var toRelease = waitingReaders;
245+
waitingReaders = 0;
246+
lockHeld = toRelease;
247+
grantedReaders += toRelease;
248+
_ = readerSemaphore.Release(toRelease);
249+
}
250+
}
251+
}
252+
}
253+
}

0 commit comments

Comments
 (0)