Skip to content

Commit 2725774

Browse files
authored
Migration Misc Fixes (#552)
* separate error message when adding keySlice to migration tracker * safely add and remove sessions in migration task store * fix slotsrange option * safely deal with multiple dispose calls to MigrateSession * fix bug when iterating stores * fix stores window scan tracking when copyOption is not enabled * remove logger messages * release version 1.0.18 * add slotsrange data test * add back some log messages * nit; change info to trace message
1 parent d156731 commit 2725774

12 files changed

+136
-97
lines changed

.azure/pipelines/azure-pipelines-external-release.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
44
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 45) -- NOTE - these two values need to be the same
55
######################################
6-
name: 1.0.17
6+
name: 1.0.18
77
trigger:
88
branches:
99
include:

libs/cluster/CmdStrings.cs

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ static class CmdStrings
5656
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_MIGRATE_TO_MYSELF => "ERR Can't MIGRATE to myself"u8;
5757
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_INCOMPLETESLOTSRANGE => "ERR incomplete slotrange"u8;
5858
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_SLOTNOTMIGRATING => "ERR slot state not set to MIGRATING state"u8;
59+
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_FAILEDTOADDKEY => "ERR Failed to add key for migration tracking"u8;
5960
public static ReadOnlySpan<byte> RESP_ERR_GENERIC_PARSING => "ERR Parsing error"u8;
6061

6162
/// <summary>

libs/cluster/Server/ClusterManager.cs

-4
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,7 @@ public void Start()
123123
public void FlushConfig()
124124
{
125125
lock (this)
126-
{
127-
logger?.LogTrace("Start FlushConfig {path}", clusterConfigDevice.FileName);
128126
ClusterUtils.WriteInto(clusterConfigDevice, pool, 0, currentConfig.ToByteArray(), logger: logger);
129-
logger?.LogTrace("End FlushConfig {path}", clusterConfigDevice.FileName);
130-
}
131127
}
132128

133129
/// <summary>

libs/cluster/Server/ClusterManagerSlotState.cs

+10-13
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public bool TryAddSlots(HashSet<int> slots, out int slotAssigned)
4848
break;
4949
}
5050
FlushConfig();
51-
logger?.LogTrace("ADD SLOTS {slots}", GetRange(slots.ToArray()));
51+
logger?.LogTrace("AddSlots {slots}", GetRange(slots.ToArray()));
5252
return true;
5353
}
5454

@@ -78,7 +78,7 @@ public bool TryRemoveSlots(HashSet<int> slots, out int notLocalSlot)
7878
break;
7979
}
8080
FlushConfig();
81-
logger?.LogTrace("REMOVE SLOTS {slots}", string.Join(",", slots));
81+
logger?.LogTrace("RemoveSlots {slots}", GetRange(slots.ToArray()));
8282
return true;
8383
}
8484

@@ -143,8 +143,7 @@ public bool TryPrepareSlotForMigration(int slot, string nodeid, out ReadOnlySpan
143143
break;
144144
}
145145
FlushConfig();
146-
147-
logger?.LogInformation("MIGRATE {slot} TO {currentConfig.GetWorkerAddressFromNodeId(nodeid)}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
146+
logger?.LogTrace("SetSlot MIGRATING {slot} TO {nodeId}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
148147
return true;
149148
}
150149

@@ -211,8 +210,7 @@ public bool TryPrepareSlotsForMigration(HashSet<int> slots, string nodeid, out R
211210
break;
212211
}
213212
FlushConfig();
214-
215-
logger?.LogInformation("MIGRATE {slot} TO {migrating node}", string.Join(' ', slots), currentConfig.GetWorkerAddressFromNodeId(nodeid));
213+
logger?.LogTrace("SetSlotsRange MIGRATING {slot} TO {nodeId}", GetRange(slots.ToArray()), currentConfig.GetWorkerAddressFromNodeId(nodeid));
216214
return true;
217215
}
218216

@@ -266,8 +264,7 @@ public bool TryPrepareSlotForImport(int slot, string nodeid, out ReadOnlySpan<by
266264
break;
267265
}
268266
FlushConfig();
269-
270-
logger?.LogInformation("IMPORT {slot} FROM {currentConfig.GetWorkerAddressFromNodeId(nodeid)}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
267+
logger?.LogTrace("SetSlot IMPORTING {slot} TO {nodeId}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
271268
return true;
272269
}
273270

@@ -329,8 +326,8 @@ public bool TryPrepareSlotsForImport(HashSet<int> slots, string nodeid, out Read
329326
if (Interlocked.CompareExchange(ref currentConfig, newConfig, current) == current)
330327
break;
331328
}
332-
333-
logger?.LogInformation("IMPORT {slot} FROM {importingNode}", string.Join(' ', slots), currentConfig.GetWorkerAddressFromNodeId(nodeid));
329+
FlushConfig();
330+
logger?.LogTrace("SetSlotsRange IMPORTING {slot} TO {nodeId}", GetRange(slots.ToArray()), currentConfig.GetWorkerAddressFromNodeId(nodeid));
334331
return true;
335332
}
336333

@@ -364,7 +361,7 @@ public bool TryPrepareSlotForOwnershipChange(int slot, string nodeid, out ReadOn
364361
break;
365362
}
366363
FlushConfig();
367-
logger?.LogInformation("SLOT {slot} IMPORTED TO {nodeid}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
364+
logger?.LogTrace("SLOT {slot} MIGRATED TO {nodeid}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
368365
return true;
369366
}
370367
else if (current.GetState((ushort)slot) is SlotState.IMPORTING)
@@ -385,7 +382,7 @@ public bool TryPrepareSlotForOwnershipChange(int slot, string nodeid, out ReadOn
385382
break;
386383
}
387384
FlushConfig();
388-
logger?.LogInformation("SLOT {slot} IMPORTED FROM {nodeid}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
385+
logger?.LogTrace("SLOT {slot} IMPORTED FROM {nodeid}", slot, currentConfig.GetWorkerAddressFromNodeId(nodeid));
389386
return true;
390387
}
391388
return true;
@@ -418,7 +415,7 @@ public bool TryPrepareSlotsForOwnershipChange(HashSet<int> slots, string nodeid,
418415
}
419416

420417
FlushConfig();
421-
logger?.LogInformation("SLOT {slot} IMPORTED TO {endpoint}", slots, currentConfig.GetWorkerAddressFromNodeId(nodeid));
418+
logger?.LogTrace("Slots {slot} IMPORTED TO {endpoint}", GetRange(slots.ToArray()), currentConfig.GetWorkerAddressFromNodeId(nodeid));
422419
return true;
423420
}
424421

libs/cluster/Server/Migration/MigrateSession.cs

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ internal sealed unsafe partial class MigrateSession : IDisposable
4545
readonly List<(int, int)> _slotRanges;
4646
readonly Dictionary<ArgSlice, KeyMigrationStatus> _keys;
4747
SingleWriterMultiReaderLock _keyDictLock;
48+
SingleWriterMultiReaderLock _disposed;
4849

4950
readonly HashSet<int> _sslots;
5051
readonly CancellationTokenSource _cts = new();
@@ -222,6 +223,7 @@ internal MigrateSession(
222223
/// </summary>
223224
public void Dispose()
224225
{
226+
if (!_disposed.TryWriteLock()) return;
225227
_cts?.Cancel();
226228
_cts?.Dispose();
227229
_gcs.Dispose();

libs/cluster/Server/Migration/MigrateSessionSlots.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ public bool MigrateSlotsDriver()
2222
while (true)
2323
{
2424
// Iterate main store
25-
if (!localServerSession.BasicGarnetApi.IterateMainStore(ref mainStoreGetKeysInSlots, storeTailAddress))
26-
return false;
25+
_ = localServerSession.BasicGarnetApi.IterateMainStore(ref mainStoreGetKeysInSlots, storeTailAddress);
2726

2827
// If did not acquire any keys stop scanning
2928
if (_keys.IsNullOrEmpty())
@@ -50,8 +49,7 @@ public bool MigrateSlotsDriver()
5049
while (true)
5150
{
5251
// Iterate object store
53-
if (!localServerSession.BasicGarnetApi.IterateObjectStore(ref objectStoreGetKeysInSlots, objectStoreTailAddress))
54-
return false;
52+
_ = localServerSession.BasicGarnetApi.IterateObjectStore(ref objectStoreGetKeysInSlots, objectStoreTailAddress);
5553

5654
// If did not acquire any keys stop scanning
5755
if (_keys.IsNullOrEmpty())

libs/cluster/Server/Migration/MigrateSessionTaskStore.cs

+23-11
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,13 @@ public int GetNumSessions()
5151
_lock.ReadLock();
5252
if (_disposed) return 0;
5353

54+
HashSet<MigrateSession> ss = [];
5455
for (var i = 0; i < sessions.Length; i++)
55-
count += sessions[i] != null ? 1 : 0;
56+
{
57+
if (sessions[i] != null)
58+
_ = ss.Add(sessions[i]);
59+
}
60+
count = ss.Count;
5661
}
5762
finally
5863
{
@@ -101,6 +106,7 @@ public bool TryAddMigrateSession(
101106
_lock.WriteLock();
102107
if (_disposed) return false;
103108

109+
// First iterate and check if corresponding slot is associated to another active migrate session
104110
foreach (var slot in mSession.GetSlots)
105111
{
106112
if (sessions[slot] != null)
@@ -109,8 +115,12 @@ public bool TryAddMigrateSession(
109115
success = false;
110116
return false;
111117
}
112-
sessions[slot] = mSession;
113118
}
119+
120+
// If reached this point all slots to be migrated are not associated with any other session
121+
// so we can mark them as being associated with this newly added session
122+
foreach (var slot in mSession.GetSlots)
123+
sessions[slot] = mSession;
114124
}
115125
catch (Exception ex)
116126
{
@@ -129,6 +139,11 @@ public bool TryAddMigrateSession(
129139
return success;
130140
}
131141

142+
/// <summary>
143+
/// Remove only the provided session instance
144+
/// </summary>
145+
/// <param name="mSession"></param>
146+
/// <returns></returns>
132147
public bool TryRemove(MigrateSession mSession)
133148
{
134149
try
@@ -156,30 +171,27 @@ public bool TryRemove(MigrateSession mSession)
156171
}
157172
}
158173

174+
/// <summary>
175+
/// Remove all sessions associated with the provided targetNodeId
176+
/// </summary>
177+
/// <param name="targetNodeId"></param>
178+
/// <returns></returns>
159179
public bool TryRemove(string targetNodeId)
160180
{
161181
try
162182
{
163183
_lock.WriteLock();
164184
if (_disposed) return false;
165-
HashSet<MigrateSession> mSessions = null;
166185
for (var i = 0; i < sessions.Length; i++)
167186
{
168187
var s = sessions[i];
169188
if (s != null && s.GetTargetNodeId.Equals(targetNodeId, StringComparison.Ordinal))
170189
{
171190
sessions[i] = null;
172-
mSessions ??= [];
173-
_ = mSessions.Add(s);
191+
s.Dispose();
174192
}
175193
}
176194

177-
if (mSessions != null)
178-
{
179-
foreach (var session in mSessions)
180-
session.Dispose();
181-
}
182-
183195
return true;
184196
}
185197
catch (Exception ex)

libs/cluster/Server/Migration/MigrationKeyIterationFunctions.cs

+3-61
Original file line numberDiff line numberDiff line change
@@ -14,64 +14,6 @@ internal sealed unsafe partial class MigrateSession : IDisposable
1414
{
1515
internal sealed class MigrationKeyIterationFunctions
1616
{
17-
internal struct MainStoreMigrateSlots : IScanIteratorFunctions<SpanByte, SpanByte>
18-
{
19-
readonly MigrateSession session;
20-
readonly HashSet<int> slots;
21-
22-
internal MainStoreMigrateSlots(MigrateSession session, HashSet<int> slots)
23-
{
24-
this.session = session;
25-
this.slots = slots;
26-
}
27-
28-
public bool SingleReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
29-
{
30-
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
31-
var s = HashSlotUtils.HashSlot(ref key);
32-
33-
if (slots.Contains(s) && !ClusterSession.Expired(ref value) && !session.WriteOrSendMainStoreKeyValuePair(ref key, ref value))
34-
return false;
35-
return true;
36-
}
37-
public bool ConcurrentReader(ref SpanByte key, ref SpanByte value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
38-
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
39-
public bool OnStart(long beginAddress, long endAddress) => true;
40-
public void OnStop(bool completed, long numberOfRecords) { }
41-
public void OnException(Exception exception, long numberOfRecords) { }
42-
}
43-
44-
internal struct ObjectStoreMigrateSlots : IScanIteratorFunctions<byte[], IGarnetObject>
45-
{
46-
readonly MigrateSession session;
47-
readonly HashSet<int> slots;
48-
49-
internal ObjectStoreMigrateSlots(MigrateSession session, HashSet<int> slots)
50-
{
51-
this.session = session;
52-
this.slots = slots;
53-
}
54-
55-
public bool SingleReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
56-
{
57-
cursorRecordResult = CursorRecordResult.Accept; // default; not used here
58-
var slot = HashSlotUtils.HashSlot(key);
59-
60-
if (slots.Contains(slot) && !ClusterSession.Expired(ref value))
61-
{
62-
byte[] objectData = GarnetObjectSerializer.Serialize(value);
63-
if (!session.WriteOrSendObjectStoreKeyValuePair(key, objectData, value.Expiration))
64-
return false;
65-
}
66-
return true;
67-
}
68-
public bool ConcurrentReader(ref byte[] key, ref IGarnetObject value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
69-
=> SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
70-
public bool OnStart(long beginAddress, long endAddress) => true;
71-
public void OnStop(bool completed, long numberOfRecords) { }
72-
public void OnException(Exception exception, long numberOfRecords) { }
73-
}
74-
7517
internal unsafe struct MainStoreGetKeysInSlots : IScanIteratorFunctions<SpanByte, SpanByte>
7618
{
7719
MigrationScanIterator iterator;
@@ -148,7 +90,6 @@ public void OnStop(bool completed, long numberOfRecords) { }
14890
public void OnException(Exception exception, long numberOfRecords) { }
14991
}
15092

151-
15293
internal struct MigrationScanIterator
15394
{
15495
readonly MigrateSession session;
@@ -198,8 +139,9 @@ public void AdvanceIterator()
198139
/// <returns></returns>
199140
public bool Consume(ref Span<byte> key)
200141
{
201-
// Check if key is within the current processing window
202-
if (currentOffset < offset)
142+
// Check if key is within the current processing window only if _copyOption is set
143+
// in order to skip keys that have been send over to target node but not deleted locally
144+
if (session._copyOption && currentOffset < offset)
203145
{
204146
currentOffset++;
205147
return true;

libs/cluster/Session/MigrateCommand.cs

+10-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ internal enum MigrateCmdParseState : byte
3030
SLOTOUTOFRANGE,
3131
NOTMIGRATING,
3232
MULTI_TRANSFER_OPTION,
33+
FAILEDTOADDKEY
3334
}
3435

3536
private bool HandleCommandParsingErrors(MigrateCmdParseState mpState, string targetAddress, int targetPort, int slotMultiRef)
@@ -48,6 +49,7 @@ private bool HandleCommandParsingErrors(MigrateCmdParseState mpState, string tar
4849
MigrateCmdParseState.INCOMPLETESLOTSRANGE => CmdStrings.RESP_ERR_GENERIC_INCOMPLETESLOTSRANGE,
4950
MigrateCmdParseState.SLOTOUTOFRANGE => Encoding.ASCII.GetBytes($"ERR Slot {slotMultiRef} out of range."),
5051
MigrateCmdParseState.NOTMIGRATING => CmdStrings.RESP_ERR_GENERIC_SLOTNOTMIGRATING,
52+
MigrateCmdParseState.FAILEDTOADDKEY => CmdStrings.RESP_ERR_GENERIC_FAILEDTOADDKEY,
5153
_ => CmdStrings.RESP_ERR_GENERIC_PARSING,
5254
};
5355
while (!RespWriteUtils.WriteError(errorMessage, ref dcurr, dend))
@@ -181,9 +183,12 @@ private bool TryMIGRATE(int count, byte* ptr)
181183

182184
// Add pointer of current parsed key
183185
if (!keys.TryAdd(new ArgSlice(keyPtr, ksize), KeyMigrationStatus.QUEUED))
186+
{
184187
logger?.LogWarning($"Failed to add {{key}}", Encoding.ASCII.GetString(keyPtr, ksize));
185-
else
186-
_ = slots.Add(slot);
188+
pstate = MigrateCmdParseState.FAILEDTOADDKEY;
189+
continue;
190+
}
191+
_ = slots.Add(slot);
187192
}
188193
}
189194
else if (option.Equals("SLOTS", StringComparison.OrdinalIgnoreCase))
@@ -228,6 +233,9 @@ private bool TryMIGRATE(int count, byte* ptr)
228233
}
229234
else if (option.Equals("SLOTSRANGE", StringComparison.OrdinalIgnoreCase))
230235
{
236+
if (transferOption == TransferOption.KEYS)
237+
pstate = MigrateCmdParseState.MULTI_TRANSFER_OPTION;
238+
transferOption = TransferOption.SLOTS;
231239
slots = [];
232240
if (args == 0 || (args & 0x1) > 0)
233241
{

libs/host/GarnetServer.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class GarnetServer : IDisposable
5050
protected StoreWrapper storeWrapper;
5151

5252
// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
53-
readonly string version = "1.0.17";
53+
readonly string version = "1.0.18";
5454

5555
/// <summary>
5656
/// Resp protocol version

0 commit comments

Comments
 (0)