Skip to content

Commit 1d90f7f

Browse files
committed
cleanup migrate keys
1 parent 044d793 commit 1d90f7f

File tree

8 files changed

+146
-215
lines changed

8 files changed

+146
-215
lines changed

libs/cluster/Server/Migration/MigrateSession.cs

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ internal sealed unsafe partial class MigrateSession : IDisposable
7878
/// </summary>
7979
public LimitedFixedBufferPool GetNetworkPool => clusterProvider.migrationManager.GetNetworkPool;
8080

81-
readonly GarnetClientSession _gcs;
82-
8381
/// <summary>
8482
/// Check for overlapping slots between migrate sessions
8583
/// </summary>
@@ -94,15 +92,10 @@ public bool Overlap(MigrateSession session)
9492
readonly TransferOption transferOption;
9593

9694
/// <summary>
97-
/// MigrateSlotsScan for background slot migrate tasks
95+
/// MigrateTask for background slot migrate tasks
9896
/// </summary>
9997
readonly MigrateTask[] migrateTasks;
10098

101-
/// <summary>
102-
/// LocalServerSessions for background slot migrate tasks
103-
/// </summary>
104-
readonly LocalServerSession[] localServerSessions;
105-
10699
/// <summary>
107100
/// MigrateSession Constructor
108101
/// </summary>
@@ -154,25 +147,16 @@ internal MigrateSession(
154147

155148
Status = MigrateState.PENDING;
156149

157-
// Single key value size + few bytes for command header and arguments
158-
_gcs = GetGarnetClient();
159-
160150
if (transferOption == TransferOption.SLOTS)
161151
{
162152
migrateTasks = new MigrateTask[clusterProvider.serverOptions.ParallelMigrateTasks];
163-
localServerSessions = new LocalServerSession[clusterProvider.serverOptions.ParallelMigrateTasks];
164153
for (var i = 0; i < migrateTasks.Length; i++)
165-
{
166154
migrateTasks[i] = new MigrateTask(this);
167-
localServerSessions[i] = new LocalServerSession(clusterProvider.storeWrapper);
168-
}
169155
}
170156
else
171157
{
172158
migrateTasks = new MigrateTask[1];
173159
migrateTasks[0] = new MigrateTask(this, sketch: sketch);
174-
localServerSessions = new LocalServerSession[1];
175-
localServerSessions[0] = new LocalServerSession(clusterProvider.storeWrapper);
176160
}
177161
}
178162

@@ -197,26 +181,22 @@ public void Dispose()
197181
if (!_disposed.TryWriteLock()) return;
198182
_cts?.Cancel();
199183
_cts?.Dispose();
200-
_gcs.Dispose();
201184

202185
for (var i = 0; i < migrateTasks.Length; i++)
203-
{
204186
migrateTasks[i].Dispose();
205-
localServerSessions[i].Dispose();
206-
}
207187
}
208188

209-
private bool CheckConnection()
189+
private bool CheckConnection(GarnetClientSession client)
210190
{
211-
bool status = true;
212-
if (!_gcs.IsConnected)
191+
var status = true;
192+
if (!client.IsConnected)
213193
{
214-
_gcs.Reconnect((int)_timeout.TotalMilliseconds);
194+
client.Reconnect((int)_timeout.TotalMilliseconds);
215195
if (_passwd != null)
216196
{
217197
try
218198
{
219-
status = _gcs.Authenticate(_username, _passwd).ContinueWith(resp =>
199+
status = client.Authenticate(_username, _passwd).ContinueWith(resp =>
220200
{
221201
// Check if authenticate succeeded
222202
if (!resp.Result.Equals("OK", StringComparison.Ordinal))
@@ -274,9 +254,10 @@ private bool CheckConnection()
274254
public bool TrySetSlotRanges(string nodeid, MigrateState state)
275255
{
276256
var status = false;
257+
var client = migrateTasks[0].Client;
277258
try
278259
{
279-
if (!CheckConnection())
260+
if (!CheckConnection(client))
280261
return false;
281262
var stateBytes = state switch
282263
{
@@ -286,7 +267,7 @@ public bool TrySetSlotRanges(string nodeid, MigrateState state)
286267
_ => throw new Exception("Invalid SETSLOT Operation"),
287268
};
288269

289-
status = _gcs.SetSlotRange(stateBytes, nodeid, _slotRanges).ContinueWith(resp =>
270+
status = client.SetSlotRange(stateBytes, nodeid, _slotRanges).ContinueWith(resp =>
290271
{
291272
// Check if setslotsrange executed correctly
292273
if (!resp.Result.Equals("OK", StringComparison.Ordinal))

libs/cluster/Server/Migration/MigrateSessionCommonUtils.cs

Lines changed: 4 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,10 @@ namespace Garnet.cluster
1212
{
1313
internal sealed unsafe partial class MigrateSession : IDisposable
1414
{
15-
/// <summary>
16-
/// Write to network buffer or send if full the previous payload and then write to network before the associated kv-pair.
17-
/// </summary>
18-
/// <param name="gcs"></param>
19-
/// <param name="localServerSession"></param>
20-
/// <param name="key"></param>
21-
/// <param name="input"></param>
22-
/// <param name="o"></param>
23-
/// <returns></returns>
24-
private bool WriteOrSendMainStoreKeyValuePair(GarnetClientSession gcs, LocalServerSession localServerSession, ref SpanByte key, ref RawStringInput input, ref SpanByteAndMemory o)
15+
private bool WriteOrSendMainStoreKeyValuePair(GarnetClientSession gcs, LocalServerSession localServerSession, ref SpanByte key, ref RawStringInput input, ref SpanByteAndMemory o, out GarnetStatus status)
2516
{
2617
// Read value for key
27-
var status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref input, ref o);
18+
status = localServerSession.BasicGarnetApi.Read_MainStore(ref key, ref input, ref o);
2819

2920
// Skip if key NOTFOUND
3021
if (status == GarnetStatus.NOTFOUND)
@@ -64,13 +55,13 @@ bool WriteOrSendMainStoreKeyValuePair(GarnetClientSession gcs, ref SpanByte key,
6455
}
6556
}
6657

67-
private bool WriteOrSendObjectStoreKeyValuePair(GarnetClientSession gcs, LocalServerSession localServerSession, ref ArgSlice key, ref SpanByteAndMemory o)
58+
private bool WriteOrSendObjectStoreKeyValuePair(GarnetClientSession gcs, LocalServerSession localServerSession, ref ArgSlice key, out GarnetStatus status)
6859
{
6960
var keyByteArray = key.ToArray();
7061

7162
ObjectInput input = default;
7263
GarnetObjectStoreOutput value = default;
73-
var status = localServerSessions[0].BasicGarnetApi.Read_ObjectStore(ref keyByteArray, ref input, ref value);
64+
status = localServerSession.BasicGarnetApi.Read_ObjectStore(ref keyByteArray, ref input, ref value);
7465

7566
// Skip if key NOTFOUND
7667
if (status == GarnetStatus.NOTFOUND)
@@ -103,54 +94,6 @@ bool WriteOrSendObjectStoreKeyValuePair(GarnetClientSession gcs, byte[] key, byt
10394
}
10495
}
10596

106-
/// <summary>
107-
/// Write main store key-value pair directly to client buffer or flush buffer to make space and try again writing.
108-
/// </summary>
109-
/// <param name="key"></param>
110-
/// <param name="value"></param>
111-
/// <returns>True on success, else false</returns>
112-
private bool WriteOrSendMainStoreKeyValuePair(ref SpanByte key, ref SpanByte value)
113-
{
114-
// Check if we need to initialize cluster migrate command arguments
115-
if (_gcs.NeedsInitialization)
116-
_gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: true);
117-
118-
// Try write serialized key value to client buffer
119-
while (!_gcs.TryWriteKeyValueSpanByte(ref key, ref value, out var task))
120-
{
121-
// Flush key value pairs in the buffer
122-
if (!HandleMigrateTaskResponse(task))
123-
return false;
124-
125-
// re-initialize cluster migrate command parameters
126-
_gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: true);
127-
}
128-
return true;
129-
}
130-
131-
/// <summary>
132-
/// Write object store key-value pair directly to client buffer or flush buffer to make space and try again writing.
133-
/// </summary>
134-
/// <param name="key"></param>
135-
/// <param name="value"></param>
136-
/// <param name="expiration"></param>
137-
/// <returns></returns>
138-
private bool WriteOrSendObjectStoreKeyValuePair(byte[] key, byte[] value, long expiration)
139-
{
140-
// Check if we need to initialize cluster migrate command arguments
141-
if (_gcs.NeedsInitialization)
142-
_gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: false);
143-
144-
while (!_gcs.TryWriteKeyValueByteArray(key, value, expiration, out var task))
145-
{
146-
// Flush key value pairs in the buffer
147-
if (!HandleMigrateTaskResponse(task))
148-
return false;
149-
_gcs.SetClusterMigrateHeader(_sourceNodeId, _replaceOption, isMainStore: false);
150-
}
151-
return true;
152-
}
153-
15497
/// <summary>
15598
/// Handle response from migrate data task
15699
/// </summary>

libs/cluster/Server/Migration/MigrateSessionKeyAccess.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public bool CanAccessKey(ref ArgSlice key, int slot, bool readOnly)
3838
return true;
3939

4040
var state = SketchStatus.INITIALIZING;
41-
foreach (var migrateScan in migrateTasks)
41+
foreach (var migrateTask in migrateTasks)
4242
{
43-
if (migrateScan.sketch.Probe(key.SpanByte, out state))
43+
if (migrateTask.sketch.Probe(key.SpanByte, out state))
4444
goto found;
4545
}
4646

libs/cluster/Server/Migration/MigrateSessionKeys.cs

Lines changed: 27 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -25,52 +25,20 @@ private bool MigrateKeysFromMainStore()
2525
var bufPtr = buffer.GetValidPointer();
2626
var bufPtrEnd = bufPtr + bufferSize;
2727
var o = new SpanByteAndMemory(bufPtr, (int)(bufPtrEnd - bufPtr));
28+
var migrateTask = migrateTasks[0];
2829

2930
try
3031
{
3132
// Transition keys to MIGRATING status
32-
migrateTasks[0].sketch.SetStatus(SketchStatus.TRANSMITTING);
33+
migrateTask.sketch.SetStatus(SketchStatus.TRANSMITTING);
3334
WaitForConfigPropagation();
3435

35-
////////////////
36-
// Build Input//
37-
////////////////
38-
var input = new RawStringInput(RespCommandAccessor.MIGRATE);
39-
var keys = migrateTasks[0].sketch.Keys;
40-
for (var i = 0; i < migrateTasks[0].sketch.Keys.Count; i++)
36+
// Transmit keys from main store
37+
if (!migrateTask.TransmitKeys(StoreType.Main))
4138
{
42-
if (keys[i].Item2) continue;
43-
var spanByte = keys[i].Item1.SpanByte;
44-
45-
// Read value for key
46-
var status = localServerSessions[0].BasicGarnetApi.Read_MainStore(ref spanByte, ref input, ref o);
47-
48-
// Skip if key NOTFOUND
49-
if (status == GarnetStatus.NOTFOUND)
50-
continue;
51-
52-
// Get SpanByte from stack if any
53-
ref var value = ref o.SpanByte;
54-
if (!o.IsSpanByte)
55-
{
56-
// Reinterpret heap memory to SpanByte
57-
value = ref SpanByte.ReinterpretWithoutLength(o.Memory.Memory.Span);
58-
}
59-
60-
// Write key to network buffer if it has not expired
61-
if (!ClusterSession.Expired(ref value) && !WriteOrSendMainStoreKeyValuePair(ref spanByte, ref value))
62-
return false;
63-
64-
// Reset SpanByte for next read if any but don't dispose heap buffer as we might re-use it
65-
o.SpanByte = new SpanByte((int)(bufPtrEnd - bufPtr), (IntPtr)bufPtr);
66-
67-
// Mark for deletion
68-
keys[i] = (keys[i].Item1, true);
69-
}
70-
71-
// Flush data in client buffer
72-
if (!HandleMigrateTaskResponse(_gcs.SendAndResetIterationBuffer()))
39+
logger?.LogError("Failed transmitting keys from main store");
7340
return false;
41+
}
7442

7543
DeleteKeys();
7644
}
@@ -93,40 +61,18 @@ private bool MigrateKeysFromMainStore()
9361
/// <returns>True on success, false otherwise</returns>
9462
private bool MigrateKeysFromObjectStore()
9563
{
96-
// NOTE: Any keys not found in main store are automatically set to QUEUED before this method is called
97-
// Transition all QUEUED to MIGRATING state
98-
migrateTasks[0].sketch.SetStatus(SketchStatus.TRANSMITTING);
64+
var migrateTask = migrateTasks[0];
65+
// NOTE: Any keys not found in main store are automatically set to INITIALIZING before this method is called
66+
// Transition all INITIALIZING to TRANSMITTING state
67+
migrateTask.sketch.SetStatus(SketchStatus.TRANSMITTING);
9968
WaitForConfigPropagation();
10069

101-
var keys = migrateTasks[0].sketch.Keys;
102-
for (var i = 0; i < migrateTasks[0].sketch.Keys.Count; i++)
70+
// Transmit keys from object store
71+
if (!migrateTask.TransmitKeys(StoreType.Object))
10372
{
104-
if (keys[i].Item2) continue;
105-
var keyByteArray = keys[i].Item1.ToArray();
106-
107-
ObjectInput input = default;
108-
GarnetObjectStoreOutput value = default;
109-
var status = localServerSessions[0].BasicGarnetApi.Read_ObjectStore(ref keyByteArray, ref input, ref value);
110-
111-
// Skip if key NOTFOUND
112-
if (status == GarnetStatus.NOTFOUND)
113-
continue;
114-
115-
if (!ClusterSession.Expired(ref value.GarnetObject))
116-
{
117-
var objectData = GarnetObjectSerializer.Serialize(value.GarnetObject);
118-
119-
if (!WriteOrSendObjectStoreKeyValuePair(keyByteArray, objectData, value.GarnetObject.Expiration))
120-
return false;
121-
}
122-
123-
// Mark for deletion
124-
keys[i] = (keys[i].Item1, true);
125-
}
126-
127-
// Flush data in client buffer
128-
if (!HandleMigrateTaskResponse(_gcs.SendAndResetIterationBuffer()))
73+
logger?.LogError("Failed transmitting keys from object store");
12974
return false;
75+
}
13076

13177
// Delete keys if COPY option is false or transition KEYS from MIGRATING to MIGRATED status
13278
DeleteKeys();
@@ -138,51 +84,39 @@ private bool MigrateKeysFromObjectStore()
13884
/// </summary>
13985
private void DeleteKeys()
14086
{
141-
if (_copyOption)
142-
goto migrated;
143-
144-
// Transition to deleting to block read requests
145-
migrateTasks[0].sketch.SetStatus(SketchStatus.DELETING);
87+
var migrateTask = migrateTasks[0];
88+
// Transition to deleting to block read requests
89+
migrateTask.sketch.SetStatus(SketchStatus.DELETING);
14690
WaitForConfigPropagation();
14791

148-
foreach (var pair in migrateTasks[0].sketch.Keys)
149-
{
150-
if (!pair.Item2) continue;
151-
var spanByte = pair.Item1.SpanByte;
152-
_ = localServerSessions[0].BasicGarnetApi.DELETE(ref spanByte);
153-
}
92+
// Delete keys
93+
migrateTask.DeleteKeys();
15494

155-
migrated:
15695
// Transition to MIGRATED to release waiting operations
157-
migrateTasks[0].sketch.SetStatus(SketchStatus.MIGRATED);
96+
migrateTask.sketch.SetStatus(SketchStatus.MIGRATED);
15897
WaitForConfigPropagation();
15998
}
16099

161100
/// <summary>
162101
/// Method used to migrate keys from main and object stores.
163102
/// This method is used to process the MIGRATE KEYS transfer option.
164103
/// </summary>
165-
/// <param name="storeType"></param>
166104
/// <returns></returns>
167-
public bool MigrateKeys(StoreType storeType = StoreType.All)
105+
public bool MigrateKeys()
168106
{
169107
try
170108
{
171-
if (!CheckConnection())
109+
var migrateTask = migrateTasks[0];
110+
if (!migrateTask.Initialize())
172111
return false;
173112

174-
if (storeType is StoreType.All or StoreType.Main)
175-
{
176-
// Migrate main store keys
177-
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequency);
178-
if (!MigrateKeysFromMainStore())
179-
return false;
180-
}
113+
// Migrate main store keys
114+
if (!MigrateKeysFromMainStore())
115+
return false;
181116

182117
// Migrate object store keys
183-
if (!clusterProvider.serverOptions.DisableObjects && storeType is StoreType.All or StoreType.Object)
118+
if (!clusterProvider.serverOptions.DisableObjects)
184119
{
185-
_gcs.InitializeIterationBuffer(clusterProvider.storeWrapper.loggingFrequency);
186120
if (!MigrateKeysFromObjectStore())
187121
return false;
188122
}

0 commit comments

Comments
 (0)