From e50604443fa259908f11d7e7981a529338db6c3c Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Mon, 16 Dec 2024 00:33:39 +0530 Subject: [PATCH 1/4] Added BZMPOP, BZPOPMAX and BZPOPIN commands --- libs/resources/RespCommandsDocs.json | 104 +++++++++++ libs/resources/RespCommandsInfo.json | 72 ++++++++ .../ItemBroker/CollectionItemBroker.cs | 58 ++++--- .../ItemBroker/CollectionItemResult.cs | 11 ++ .../Objects/SortedSet/SortedSetObjectImpl.cs | 14 ++ libs/server/Resp/Objects/SortedSetCommands.cs | 162 ++++++++++++++++++ libs/server/Resp/Parser/RespCommand.cs | 15 ++ libs/server/Resp/RespServerSession.cs | 3 + .../Session/ObjectStore/SortedSetOps.cs | 21 ++- .../CommandInfoUpdater/SupportedCommand.cs | 3 + test/Garnet.test/Resp/ACL/RespCommandTests.cs | 45 +++++ ...ckingListTests.cs => RespBlockingTests.cs} | 144 +++++++++++++++- website/docs/commands/api-compatibility.md | 6 +- website/docs/commands/data-structures.md | 60 +++++++ 14 files changed, 684 insertions(+), 34 deletions(-) rename test/Garnet.test/{RespBlockingListTests.cs => RespBlockingTests.cs} (69%) diff --git a/libs/resources/RespCommandsDocs.json b/libs/resources/RespCommandsDocs.json index 10fc7b6e57..76fe840094 100644 --- a/libs/resources/RespCommandsDocs.json +++ b/libs/resources/RespCommandsDocs.json @@ -773,6 +773,110 @@ } ] }, + { + "Command": "BZMPOP", + "Name": "BZMPOP", + "Summary": "Removes and returns a member by score from one or more sorted sets. Blocks until a member is available otherwise. Deletes the sorted set if the last element was popped.", + "Group": "SortedSet", + "Complexity": "O(K) \u002B O(M*log(N)) where K is the number of provided keys, N being the number of elements in the sorted set, and M being the number of elements popped.", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "TIMEOUT", + "DisplayText": "timeout", + "Type": "Double" + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "NUMKEYS", + "DisplayText": "numkeys", + "Type": "Integer" + }, + { + "TypeDiscriminator": "RespCommandKeyArgument", + "Name": "KEY", + "DisplayText": "key", + "Type": "Key", + "ArgumentFlags": "Multiple", + "KeySpecIndex": 0 + }, + { + "TypeDiscriminator": "RespCommandContainerArgument", + "Name": "WHERE", + "Type": "OneOf", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "MIN", + "DisplayText": "min", + "Type": "PureToken", + "Token": "MIN" + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "MAX", + "DisplayText": "max", + "Type": "PureToken", + "Token": "MAX" + } + ] + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "COUNT", + "DisplayText": "count", + "Type": "Integer", + "Token": "COUNT", + "ArgumentFlags": "Optional" + } + ] + }, + { + "Command": "BZPOPMAX", + "Name": "BZPOPMAX", + "Summary": "Removes and returns the member with the highest score from one or more sorted sets. Blocks until a member available otherwise. Deletes the sorted set if the last element was popped.", + "Group": "SortedSet", + "Complexity": "O(log(N)) with N being the number of elements in the sorted set.", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandKeyArgument", + "Name": "KEY", + "DisplayText": "key", + "Type": "Key", + "ArgumentFlags": "Multiple", + "KeySpecIndex": 0 + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "TIMEOUT", + "DisplayText": "timeout", + "Type": "Double" + } + ] + }, + { + "Command": "BZPOPMIN", + "Name": "BZPOPMIN", + "Summary": "Removes and returns the member with the lowest score from one or more sorted sets. Blocks until a member is available otherwise. Deletes the sorted set if the last element was popped.", + "Group": "SortedSet", + "Complexity": "O(log(N)) with N being the number of elements in the sorted set.", + "Arguments": [ + { + "TypeDiscriminator": "RespCommandKeyArgument", + "Name": "KEY", + "DisplayText": "key", + "Type": "Key", + "ArgumentFlags": "Multiple", + "KeySpecIndex": 0 + }, + { + "TypeDiscriminator": "RespCommandBasicArgument", + "Name": "TIMEOUT", + "DisplayText": "timeout", + "Type": "Double" + } + ] + }, { "Command": "CLIENT", "Name": "CLIENT", diff --git a/libs/resources/RespCommandsInfo.json b/libs/resources/RespCommandsInfo.json index 4c06e3e447..2ea4ada975 100644 --- a/libs/resources/RespCommandsInfo.json +++ b/libs/resources/RespCommandsInfo.json @@ -415,6 +415,78 @@ } ] }, + { + "Command": "BZMPOP", + "Name": "BZMPOP", + "Arity": -5, + "Flags": "Blocking, MovableKeys, Write", + "AclCategories": "Blocking, SortedSet, Slow, Write", + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 2 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysKeyNum", + "KeyNumIdx": 0, + "FirstKey": 1, + "KeyStep": 1 + }, + "Flags": "RW, Access, Delete" + } + ] + }, + { + "Command": "BZPOPMAX", + "Name": "BZPOPMAX", + "Arity": -3, + "Flags": "Blocking, Fast, Write", + "FirstKey": 1, + "LastKey": -2, + "Step": 1, + "AclCategories": "Blocking, Fast, SortedSet, Write", + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": -2, + "KeyStep": 1, + "Limit": 0 + }, + "Flags": "RW, Access, Delete" + } + ] + }, + { + "Command": "BZPOPMIN", + "Name": "BZPOPMIN", + "Arity": -3, + "Flags": "Blocking, Fast, Write", + "FirstKey": 1, + "LastKey": -2, + "Step": 1, + "AclCategories": "Blocking, Fast, SortedSet, Write", + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": -2, + "KeyStep": 1, + "Limit": 0 + }, + "Flags": "RW, Access, Delete" + } + ] + }, { "Command": "CLIENT", "Name": "CLIENT", diff --git a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs index 82a2158b07..37d608e715 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs @@ -383,37 +383,44 @@ private static bool TryMoveNextListItem(ListObject srcListObj, ListObject dstLis } /// - /// Try to get next available item from sorted set object + /// Try to get next available item from sorted set object based on command type + /// BZPOPMIN and BZPOPMAX share same implementation since Dictionary.First() and Last() + /// handle the ordering automatically based on sorted set scores /// - /// Sorted set object - /// RESP command - /// Item retrieved - /// True if found available item - private static bool TryGetNextSetObject(SortedSetObject sortedSetObj, RespCommand command, out byte[] nextItem) + private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sortedSetObj, RespCommand command, ArgSlice[] cmdArgs, out CollectionItemResult result) { - nextItem = default; - - // If object has no items, return + result = default; + if (sortedSetObj.Dictionary.Count == 0) return false; - // Get the next object according to operation type switch (command) { + case RespCommand.BZPOPMIN: + case RespCommand.BZPOPMAX: + var element = sortedSetObj.Pop(command == RespCommand.BZPOPMAX); + result = new CollectionItemResult(key, [element]); + return true; + + case RespCommand.BZMPOP: + var lowScoresFirst = *(bool*)cmdArgs[0].ptr; + var popCount = *(int*)cmdArgs[1].ptr; + popCount = Math.Min(popCount, sortedSetObj.Dictionary.Count); + + var scoredItems = new (double Score, byte[] Element)[popCount]; + + for (int i = 0; i < popCount; i++) + { + scoredItems[i] = sortedSetObj.Pop(!lowScoresFirst); + } + + result = new CollectionItemResult(key, scoredItems); + return true; + default: return false; } } - /// - /// Try to get next available item from object - /// - /// Key of object - /// Current storage session - /// RESP command - /// Additional command arguments - /// Collection size - /// Result of command - /// True if found available item private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out CollectionItemResult result) { currCount = default; @@ -423,6 +430,7 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp var objectType = command switch { RespCommand.BLPOP or RespCommand.BRPOP or RespCommand.BLMOVE or RespCommand.BLMPOP => GarnetObjectType.List, + RespCommand.BZPOPMIN or RespCommand.BZPOPMAX or RespCommand.BZMPOP => GarnetObjectType.SortedSet, _ => throw new NotSupportedException() }; @@ -524,11 +532,13 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp } case SortedSetObject setObj: currCount = setObj.Dictionary.Count; - if (objectType != GarnetObjectType.SortedSet) return false; + if (objectType != GarnetObjectType.SortedSet) + return false; + if (currCount == 0) + return false; + + return TryGetNextSetObjects(key, setObj, command, cmdArgs, out result); - var hasValue = TryGetNextSetObject(setObj, command, out var sortedSetNextItem); - result = new CollectionItemResult(key, sortedSetNextItem); - return hasValue; default: return false; } diff --git a/libs/server/Objects/ItemBroker/CollectionItemResult.cs b/libs/server/Objects/ItemBroker/CollectionItemResult.cs index a3fc8311d5..d92f01ba2c 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemResult.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemResult.cs @@ -20,6 +20,12 @@ public CollectionItemResult(byte[] key, byte[][] items) Items = items; } + public CollectionItemResult(byte[] key, (double Score, byte[] Element)[] scoredItems) + { + Key = key; + ScoredItems = scoredItems; + } + /// /// True if item was found /// @@ -40,6 +46,11 @@ public CollectionItemResult(byte[] key, byte[][] items) /// internal byte[][] Items { get; } + /// + /// Scored items retrieved from collection, where each item has an associated score. + /// + internal (double Score, byte[] Element)[] ScoredItems { get; } + /// /// Instance of empty result /// diff --git a/libs/server/Objects/SortedSet/SortedSetObjectImpl.cs b/libs/server/Objects/SortedSet/SortedSetObjectImpl.cs index b1480f455e..03ef1aecf6 100644 --- a/libs/server/Objects/SortedSet/SortedSetObjectImpl.cs +++ b/libs/server/Objects/SortedSet/SortedSetObjectImpl.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Runtime.Intrinsics.X86; using System.Text; using Garnet.common; using Tsavorite.core; @@ -886,6 +887,19 @@ private void SortedSetRank(ref ObjectInput input, ref SpanByteAndMemory output, } } + public (double Score, byte[] Element) Pop(bool popMaxScoreElement = false) + { + if (sortedSet.Count == 0) + return default; + + var element = popMaxScoreElement ? sortedSet.Max : sortedSet.Min; + sortedSet.Remove(element); + sortedSetDict.Remove(element.Element); + this.UpdateSize(element.Element, false); + + return element; + } + /// /// Removes and returns up to COUNT members with the low or high score /// diff --git a/libs/server/Resp/Objects/SortedSetCommands.cs b/libs/server/Resp/Objects/SortedSetCommands.cs index 6289f23d9e..862c643a13 100644 --- a/libs/server/Resp/Objects/SortedSetCommands.cs +++ b/libs/server/Resp/Objects/SortedSetCommands.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Linq; using System.Text; using Garnet.common; using Tsavorite.core; @@ -1028,5 +1029,166 @@ private unsafe bool SortedSetDifferenceStore(ref TGarnetApi storageA return true; } + + /// + /// BZPOPMIN/BZPOPMAX key [key ...] timeout + /// + private unsafe bool SortedSetBlockingPop(RespCommand command) + { + if (parseState.Count < 2) + { + return AbortWithWrongNumberOfArguments(command.ToString()); + } + + var keysBytes = new byte[parseState.Count - 1][]; + + for (var i = 0; i < keysBytes.Length; i++) + { + keysBytes[i] = parseState.GetArgSliceByRef(i).SpanByte.ToByteArray(); + } + + if (!parseState.TryGetDouble(parseState.Count - 1, out var timeout)) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT); + } + + if (storeWrapper.itemBroker == null) + throw new GarnetException("Object store is disabled"); + + var result = storeWrapper.itemBroker.GetCollectionItemAsync(command, keysBytes, this, timeout).Result; + + if (!result.Found) + { + while (!RespWriteUtils.WriteNull(ref dcurr, dend)) + SendAndReset(); + } + else + { + while (!RespWriteUtils.WriteArrayLength(3, ref dcurr, dend)) + SendAndReset(); + + while (!RespWriteUtils.WriteBulkString(result.Key, ref dcurr, dend)) + SendAndReset(); + + var memberAndScore = result.ScoredItems; + while (!RespWriteUtils.WriteBulkString(memberAndScore[0].Element, ref dcurr, dend)) + SendAndReset(); + + while (!RespWriteUtils.TryWriteDoubleBulkString(memberAndScore[0].Score, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } + + /// + /// BZMPOP timeout numkeys key [key ...] <MIN | MAX> [COUNT count] + /// + private unsafe bool SortedSetBlockingMPop() + { + if (storeWrapper.itemBroker == null) + throw new GarnetException("Object store is disabled"); + + if (parseState.Count < 4) + { + return AbortWithWrongNumberOfArguments(nameof(RespCommand.BZMPOP)); + } + + var currTokenId = 0; + + // Read timeout + if (!parseState.TryGetDouble(currTokenId++, out var timeout)) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT); + } + + // Read count of keys + if (!parseState.TryGetInt(currTokenId++, out var numKeys)) + { + var err = string.Format(CmdStrings.GenericParamShouldBeGreaterThanZero, "numkeys"); + return AbortWithErrorMessage(Encoding.ASCII.GetBytes(err)); + } + + // Should have MAX|MIN or it should contain COUNT + value + if (parseState.Count != numKeys + 3 && parseState.Count != numKeys + 5) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR); + } + + var keysBytes = new byte[numKeys][]; + for (var i = 0; i < keysBytes.Length; i++) + { + keysBytes[i] = parseState.GetArgSliceByRef(currTokenId++).SpanByte.ToByteArray(); + } + + var cmdArgs = new ArgSlice[2]; + + var orderArg = parseState.GetArgSliceByRef(currTokenId++); + var orderSpan = orderArg.ReadOnlySpan; + bool lowScoresFirst; + + if (orderSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.MIN)) + lowScoresFirst = true; + else if (orderSpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.MAX)) + lowScoresFirst = false; + else + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR); + } + + cmdArgs[0] = new ArgSlice((byte*)&lowScoresFirst, 1); + + var popCount = 1; + + if (parseState.Count == numKeys + 5) + { + var countKeyword = parseState.GetArgSliceByRef(currTokenId++); + + if (!countKeyword.ReadOnlySpan.EqualsUpperCaseSpanIgnoringCase(CmdStrings.COUNT)) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR); + } + + if (!parseState.TryGetInt(currTokenId, out popCount) || popCount < 1) + { + var err = string.Format(CmdStrings.GenericParamShouldBeGreaterThanZero, "count"); + return AbortWithErrorMessage(Encoding.ASCII.GetBytes(err)); + } + } + + cmdArgs[1] = new ArgSlice((byte*)&popCount, sizeof(int)); + + var result = storeWrapper.itemBroker.GetCollectionItemAsync(RespCommand.BZMPOP, keysBytes, this, timeout, cmdArgs).Result; + + if (!result.Found) + { + while (!RespWriteUtils.WriteNull(ref dcurr, dend)) + SendAndReset(); + return true; + } + + // Write array with 2 elements: key and array of member-score pairs + while (!RespWriteUtils.WriteArrayLength(2, ref dcurr, dend)) + SendAndReset(); + + while (!RespWriteUtils.WriteBulkString(result.Key, ref dcurr, dend)) + SendAndReset(); + + var pairs = result.ScoredItems; + while (!RespWriteUtils.WriteArrayLength(pairs.Length, ref dcurr, dend)) + SendAndReset(); + + for (var i = 0; i < pairs.Length; i += 2) + { + while (!RespWriteUtils.WriteArrayLength(2, ref dcurr, dend)) + SendAndReset(); + while (!RespWriteUtils.WriteBulkString(pairs[i].Element, ref dcurr, dend)) + SendAndReset(); + while (!RespWriteUtils.TryWriteDoubleBulkString(pairs[i].Score, ref dcurr, dend)) + SendAndReset(); + } + + return true; + } } } \ No newline at end of file diff --git a/libs/server/Resp/Parser/RespCommand.cs b/libs/server/Resp/Parser/RespCommand.cs index 5cf5077592..dd6f65a63e 100644 --- a/libs/server/Resp/Parser/RespCommand.cs +++ b/libs/server/Resp/Parser/RespCommand.cs @@ -91,6 +91,9 @@ public enum RespCommand : ushort // Write commands APPEND, // Note: Update FirstWriteCommand if adding new write commands before this BITFIELD, + BZMPOP, + BZPOPMAX, + BZPOPMIN, DECR, DECRBY, DEL, @@ -1069,6 +1072,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan { return RespCommand.BLMPOP; } + else if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("BZMPOP\r\n"u8)) + { + return RespCommand.BZMPOP; + } break; case 'D': if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("DBSIZE\r\n"u8)) @@ -1338,6 +1345,14 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan { return RespCommand.EXPIREAT; } + else if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("BZPOPMAX"u8) && *(ushort*)(ptr + 12) == MemoryMarshal.Read("\r\n"u8)) + { + return RespCommand.BZPOPMAX; + } + else if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("BZPOPMIN"u8) && *(ushort*)(ptr + 12) == MemoryMarshal.Read("\r\n"u8)) + { + return RespCommand.BZPOPMIN; + } break; case 9: if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("SUBSCRIB"u8) && *(uint*)(ptr + 11) == MemoryMarshal.Read("BE\r\n"u8)) diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index a5b00f020c..c5163c4254 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -619,6 +619,9 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st RespCommand.ZRANDMEMBER => SortedSetRandomMember(ref storageApi), RespCommand.ZDIFF => SortedSetDifference(ref storageApi), RespCommand.ZDIFFSTORE => SortedSetDifferenceStore(ref storageApi), + RespCommand.BZMPOP => SortedSetBlockingMPop(), + RespCommand.BZPOPMAX => SortedSetBlockingPop(cmd), + RespCommand.BZPOPMIN => SortedSetBlockingPop(cmd), RespCommand.ZREVRANGE => SortedSetRange(cmd, ref storageApi), RespCommand.ZREVRANGEBYLEX => SortedSetRange(cmd, ref storageApi), RespCommand.ZREVRANGEBYSCORE => SortedSetRange(cmd, ref storageApi), diff --git a/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs b/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs index 72b7896be9..371c3a27d5 100644 --- a/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs +++ b/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs @@ -43,7 +43,9 @@ public unsafe GarnetStatus SortedSetAdd(ArgSlice key, ArgSlice s var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) }; - var status = RMWObjectStoreOperationWithOutput(key.ToArray(), ref input, ref objectStoreContext, ref outputFooter); + var keyBytes = key.ToArray(); + var status = RMWObjectStoreOperationWithOutput(keyBytes, ref input, ref objectStoreContext, ref outputFooter); + itemBroker.HandleCollectionUpdate(keyBytes); if (status == GarnetStatus.OK) { @@ -84,7 +86,9 @@ public unsafe GarnetStatus SortedSetAdd(ArgSlice key, (ArgSlice var outputFooter = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) }; - var status = RMWObjectStoreOperationWithOutput(key.ToArray(), ref input, ref objectStoreContext, ref outputFooter); + var keyBytes = key.ToArray(); + var status = RMWObjectStoreOperationWithOutput(keyBytes, ref input, ref objectStoreContext, ref outputFooter); + itemBroker.HandleCollectionUpdate(keyBytes); if (status == GarnetStatus.OK) { @@ -608,7 +612,7 @@ public GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySp } count = pairs?.Count ?? 0; - + var destinationKeyBty = destinationKey.ToArray(); if (count > 0) { SortedSetObject newSetObject = new(); @@ -616,7 +620,7 @@ public GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySp { newSetObject.Add(element, score); } - _ = SET(destinationKey.ToArray(), newSetObject, ref objectContext); + _ = SET(destinationKeyBty, newSetObject, ref objectContext); } else { @@ -624,6 +628,8 @@ public GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySp ref lockableContext, ref objectContext); } + itemBroker.HandleCollectionUpdate(destinationKeyBty); + return status; } finally @@ -690,7 +696,11 @@ public unsafe GarnetStatus SortedSetRank(ArgSlice key, ArgSlice /// public GarnetStatus SortedSetAdd(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output, ref TObjectContext objectStoreContext) where TObjectContext : ITsavoriteContext - => RMWObjectStoreOperationWithOutput(key, ref input, ref objectStoreContext, ref output); + { + var status = RMWObjectStoreOperationWithOutput(key, ref input, ref objectStoreContext, ref output); + itemBroker.HandleCollectionUpdate(key); + return status; + } /// /// ZRANGESTORE - Stores a range of sorted set elements into a destination key. @@ -782,6 +792,7 @@ public unsafe GarnetStatus SortedSetRangeStore(ArgSlice dstKey, var zAddOutput = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) }; RMWObjectStoreOperationWithOutput(destinationKey, ref zAddInput, ref objectStoreLockableContext, ref zAddOutput); + itemBroker.HandleCollectionUpdate(destinationKey); } } finally diff --git a/playground/CommandInfoUpdater/SupportedCommand.cs b/playground/CommandInfoUpdater/SupportedCommand.cs index c40c688f96..f400fc861c 100644 --- a/playground/CommandInfoUpdater/SupportedCommand.cs +++ b/playground/CommandInfoUpdater/SupportedCommand.cs @@ -37,6 +37,9 @@ public class SupportedCommand new("BRPOP", RespCommand.BRPOP), new("BLMOVE", RespCommand.BLMOVE), new("BRPOPLPUSH", RespCommand.BRPOPLPUSH), + new("BZMPOP", RespCommand.BZMPOP), + new("BZPOPMAX", RespCommand.BZPOPMAX), + new("BZPOPMIN", RespCommand.BZPOPMIN), new("BLMPOP", RespCommand.BLMPOP), new("CLIENT", RespCommand.CLIENT, [ diff --git a/test/Garnet.test/Resp/ACL/RespCommandTests.cs b/test/Garnet.test/Resp/ACL/RespCommandTests.cs index cda4ccae7e..55d9645e5c 100644 --- a/test/Garnet.test/Resp/ACL/RespCommandTests.cs +++ b/test/Garnet.test/Resp/ACL/RespCommandTests.cs @@ -3780,6 +3780,51 @@ static async Task DoBRPopAsync(GarnetClient client) } } + [Test] + public async Task BZMPopACLsAsync() + { + await CheckCommandsAsync( + "BZMPOP", + [DoBZMPopAsync] + ); + + static async Task DoBZMPopAsync(GarnetClient client) + { + var val = await client.ExecuteForStringResultAsync("BZMPOP", ["1", "1", "foo", "MIN"]); + ClassicAssert.IsNull(val); + } + } + + [Test] + public async Task BZPopMaxACLsAsync() + { + await CheckCommandsAsync( + "BZPOPMAX", + [DoBZPopMaxAsync] + ); + + static async Task DoBZPopMaxAsync(GarnetClient client) + { + var val = await client.ExecuteForStringResultAsync("BZPOPMAX", ["foo", "1"]); + ClassicAssert.IsNull(val); + } + } + + [Test] + public async Task BZPopMinACLsAsync() + { + await CheckCommandsAsync( + "BZPOPMIN", + [DoBZPopMinAsync] + ); + + static async Task DoBZPopMinAsync(GarnetClient client) + { + var val = await client.ExecuteForStringResultAsync("BZPOPMIN", ["foo", "1"]); + ClassicAssert.IsNull(val); + } + } + [Test] public async Task LPopACLsAsync() { diff --git a/test/Garnet.test/RespBlockingListTests.cs b/test/Garnet.test/RespBlockingTests.cs similarity index 69% rename from test/Garnet.test/RespBlockingListTests.cs rename to test/Garnet.test/RespBlockingTests.cs index 2e92524161..dcaa55d4e7 100644 --- a/test/Garnet.test/RespBlockingListTests.cs +++ b/test/Garnet.test/RespBlockingTests.cs @@ -10,7 +10,7 @@ namespace Garnet.test { - public class RespBlockingListTests + public class RespBlockingTests { GarnetServer server; private TaskFactory taskFactory = new(); @@ -389,14 +389,154 @@ public void BlmpopBlockingWithCountTest() var pushingTask = taskFactory.StartNew(() => { using var lcr = TestUtils.CreateRequest(); - Task.Delay(TimeSpan.FromSeconds(2)).Wait(); + Task.Delay(TimeSpan.FromSeconds(5)).Wait(); return lcr.SendCommand($"RPUSH {key} {string.Join(" ", values)}"); }); + Task.WaitAll([blockingTask, pushingTask], TimeSpan.FromSeconds(10)); + ClassicAssert.IsTrue(blockingTask.IsCompletedSuccessfully); + ClassicAssert.IsTrue(pushingTask.IsCompletedSuccessfully); + } + + [Test] + [TestCase("MIN", "value1", 1.5, Description = "Pop minimum score")] + [TestCase("MAX", "value3", 3.5, Description = "Pop maximum score")] + public void BasicBzmpopTest(string mode, string expectedValue, double expectedScore) + { + var key = "mykey"; + using var lightClientRequest = TestUtils.CreateRequest(); + + lightClientRequest.SendCommand($"ZADD {key} 1.5 value1 2.5 value2 3.5 value3"); + var response = lightClientRequest.SendCommand($"BZMPOP 1 1 {key} {mode}"); + var expectedResponse = $"*2\r\n${key.Length}\r\n{key}\r\n*1\r\n*2\r\n${expectedValue.Length}\r\n{expectedValue}\r\n${expectedScore.ToString().Length}\r\n{expectedScore}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } + + [Test] + [TestCase(1, "key1", "value1", 1.5, Description = "First key has minimum value")] + [TestCase(2, "key2", "value2", 2.5, Description = "Second key has minimum value")] + public void BzmpopMultipleKeysTest(int valueKeyIndex, string expectedKey, string expectedValue, double expectedScore) + { + var keys = new[] { "key1", "key2", "key3" }; + using var lightClientRequest = TestUtils.CreateRequest(); + + lightClientRequest.SendCommand($"ZADD {keys[valueKeyIndex - 1]} {expectedScore} {expectedValue}"); + var response = lightClientRequest.SendCommand($"BZMPOP 1 {keys.Length} {string.Join(" ", keys)} MIN"); + var expectedResponse = $"*2\r\n${expectedKey.Length}\r\n{expectedKey}\r\n*1\r\n*2\r\n${expectedValue.Length}\r\n{expectedValue}\r\n${expectedScore.ToString().Length}\r\n{expectedScore}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } + + [Test] + public void BzmpopTimeoutTest() + { + using var lightClientRequest = TestUtils.CreateRequest(); + var response = lightClientRequest.SendCommand("BZMPOP 1 1 nonexistentkey MIN"); + var expectedResponse = "$-1\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } + + [Test] + public void BzmpopBlockingBehaviorTest() + { + var key = "blockingzset"; + var value = "testvalue"; + var score = 1.5; + + var blockingTask = taskFactory.StartNew(() => + { + using var lcr = TestUtils.CreateRequest(); + var response = lcr.SendCommand($"BZMPOP 30 1 {key} MIN COUNT 2"); + var expectedResponse = $"*2\r\n${key.Length}\r\n{key}\r\n*1\r\n*2\r\n${value.Length}\r\n{value}\r\n${score.ToString().Length}\r\n{score}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + }); + + var pushingTask = taskFactory.StartNew(() => + { + using var lcr = TestUtils.CreateRequest(); + Task.Delay(TimeSpan.FromSeconds(2)).Wait(); + var result = lcr.SendCommand($"ZADD {key} {score} {value}"); + return result; + }); + + Task.WaitAll([blockingTask, pushingTask], TimeSpan.FromSeconds(10)); + ClassicAssert.IsTrue(blockingTask.IsCompletedSuccessfully); + ClassicAssert.IsTrue(pushingTask.IsCompletedSuccessfully); + } + + [Test] + [TestCase("BZPOPMIN", "value1", 1.5, Description = "Pop minimum score")] + [TestCase("BZPOPMAX", "value3", 3.5, Description = "Pop maximum score")] + public void BasicBzpopMinMaxTest(string command, string expectedValue, double expectedScore) + { + var key = "zsettestkey"; + using var lightClientRequest = TestUtils.CreateRequest(); + + lightClientRequest.SendCommand($"ZADD {key} 1.5 value1 2.5 value2 3.5 value3"); + var response = lightClientRequest.SendCommand($"{command} {key} 1"); + var expectedResponse = $"*3\r\n${key.Length}\r\n{key}\r\n${expectedValue.Length}\r\n{expectedValue}\r\n${expectedScore.ToString().Length}\r\n{expectedScore}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } + + [Test] + [TestCase("BZPOPMIN", 1, "key1", "value1", 1.5, Description = "First key has minimum")] + [TestCase("BZPOPMAX", 2, "key2", "value2", 3.5, Description = "Second key has maximum")] + public void BzpopMinMaxMultipleKeysTest(string command, int valueKeyIndex, string expectedKey, string expectedValue, double expectedScore) + { + var keys = new[] { "key1", "key2", "key3" }; + using var lightClientRequest = TestUtils.CreateRequest(); + + lightClientRequest.SendCommand($"ZADD {keys[valueKeyIndex - 1]} {expectedScore} {expectedValue}"); + var response = lightClientRequest.SendCommand($"{command} {string.Join(" ", keys)} 1"); + var expectedResponse = $"*3\r\n${expectedKey.Length}\r\n{expectedKey}\r\n${expectedValue.Length}\r\n{expectedValue}\r\n${expectedScore.ToString().Length}\r\n{expectedScore}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } + + [Test] + [TestCase("BZPOPMIN", Description = "Blocking pop minimum")] + [TestCase("BZPOPMAX", Description = "Blocking pop maximum")] + public void BzpopMinMaxBlockingTest(string command) + { + var key = "blockingzset2"; + var value = "testvalue"; + var score = 2.5; + + var blockingTask = taskFactory.StartNew(() => + { + using var lcr = TestUtils.CreateRequest(); + var response = lcr.SendCommand($"{command} {key} 30"); + var expectedResponse = $"*3\r\n${key.Length}\r\n{key}\r\n${value.Length}\r\n{value}\r\n${score.ToString().Length}\r\n{score}\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + }); + + var pushingTask = taskFactory.StartNew(() => + { + using var lcr = TestUtils.CreateRequest(); + Task.Delay(TimeSpan.FromSeconds(2)).Wait(); + return lcr.SendCommand($"ZADD {key} {score} {value}"); + }); + Task.WaitAll([blockingTask, pushingTask], TimeSpan.FromSeconds(5)); ClassicAssert.IsTrue(blockingTask.IsCompletedSuccessfully); ClassicAssert.IsTrue(pushingTask.IsCompletedSuccessfully); } + [Test] + [TestCase("BZPOPMIN", Description = "Timeout on minimum")] + [TestCase("BZPOPMAX", Description = "Timeout on maximum")] + public void BzpopMinMaxTimeoutTest(string command) + { + using var lightClientRequest = TestUtils.CreateRequest(); + var response = lightClientRequest.SendCommand($"{command} nonexistentkey 1"); + var expectedResponse = "$-1\r\n"; + var actualValue = Encoding.ASCII.GetString(response).Substring(0, expectedResponse.Length); + ClassicAssert.AreEqual(expectedResponse, actualValue); + } } } \ No newline at end of file diff --git a/website/docs/commands/api-compatibility.md b/website/docs/commands/api-compatibility.md index 3e9ba3f4bc..ff880c15fe 100644 --- a/website/docs/commands/api-compatibility.md +++ b/website/docs/commands/api-compatibility.md @@ -318,9 +318,9 @@ Note that this list is subject to change as we continue to expand our API comman | | HELP | ➖ | | | | LEN | ➖ | | | | RESET | ➖ | | -| **SORTED SET** | BZMPOP | ➖ | | -| | BZPOPMAX | ➖ | | -| | BZPOPMIN | ➖ | | +| **SORTED SET** | [BZMPOP](data-structures.md#bzmpop) | ➕ | | +| | [BZPOPMAX](data-structures.md#bzpopmax) | ➕ | | +| | [BZPOPMIN](data-structures.md#bzpopmin) | ➕ | | | | [ZADD](data-structures.md#zadd) | ➕ | | | | [ZCARD](data-structures.md#zcard) | ➕ | | | | [ZCOUNT](data-structures.md#zcount) | ➕ | | diff --git a/website/docs/commands/data-structures.md b/website/docs/commands/data-structures.md index fccfe83e2e..e6e1dcb203 100644 --- a/website/docs/commands/data-structures.md +++ b/website/docs/commands/data-structures.md @@ -871,6 +871,66 @@ _Array reply:_ a list of string **member** scores as double-precision floating p --- +### BZMPOP + +#### Syntax + +```bash + BZMPOP timeout numkeys key [key ...] [COUNT count] +``` + +BZMPOP is the blocking variant of [ZMPOP](#zmpop). When any of the sorted sets contains elements, this command behaves exactly like ZMPOP. When used inside a MULTI/EXEC block, this command behaves exactly like ZMPOP. When all sorted sets are empty, Garnet will block the connection until another client pushes to it or until timeout (a double value specifying the maximum number of seconds to block) is reached. A timeout of zero can be used to block indefinitely. + +- **MIN**: Remove elements starting with the lowest scores +- **MAX**: Remove elements starting with the highest scores +- **COUNT**: Specifies how many elements to pop (default is 1) + +#### Resp Reply + +One of the following: + +* Null reply: when no element could be popped. +* Array reply: a two-element array with the first element being the name of the key from which elements were popped, and the second element is an array of the popped elements. Every entry in the elements array is also an array that contains the member and its score. +--- + +### BZPOPMAX + +#### Syntax + +```bash + BZPOPMAX key [key ...] timeout +``` + +BZPOPMAX is the blocking variant of [ZPOPMAX](#zpopmax). When any of the sorted sets contains elements, this command behaves exactly like ZPOPMAX. When used inside a MULTI/EXEC block, this command behaves exactly like ZPOPMAX. When all sorted sets are empty, Garnet will block the connection until another client pushes to it or until timeout (a double value specifying the maximum number of seconds to block) is reached. A timeout of zero can be used to block indefinitely. + +#### Resp Reply + +One of the following: + +* Null reply: when no element could be popped and the timeout expired. +* Array reply: the keyname, popped member, and its score. + +--- + +### BZPOPMIN + +#### Syntax + +```bash + BZPOPMIN key [key ...] timeout +``` + +BZPOPMIN is the blocking variant of [ZPOPMIN](#zpopmin). When any of the sorted sets contains elements, this command behaves exactly like ZPOPMIN. When used inside a MULTI/EXEC block, this command behaves exactly like ZPOPMIN. When all sorted sets are empty, Garnet will block the connection until another client pushes to it or until timeout (a double value specifying the maximum number of seconds to block) is reached. A timeout of zero can be used to block indefinitely. + +#### Resp Reply + +One of the following: + +* Null reply: when no element could be popped and the timeout expired. +* Array reply: the keyname, popped member, and its score. + +--- + ### ZMPOP #### Syntax From 15d1b186014c3a9c6f0f3173b5bb89965e47344e Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Mon, 16 Dec 2024 00:39:29 +0530 Subject: [PATCH 2/4] Fixed code format --- libs/server/Objects/ItemBroker/CollectionItemBroker.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs index 37d608e715..81eff0e585 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs @@ -390,7 +390,7 @@ private static bool TryMoveNextListItem(ListObject srcListObj, ListObject dstLis private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sortedSetObj, RespCommand command, ArgSlice[] cmdArgs, out CollectionItemResult result) { result = default; - + if (sortedSetObj.Dictionary.Count == 0) return false; switch (command) From fddbb12faf80809c108d6fc4c7646081d1bf13c5 Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Sun, 12 Jan 2025 01:09:47 +0530 Subject: [PATCH 3/4] Review command fix --- .../Objects/ItemBroker/CollectionItemBroker.cs | 4 ++-- .../Objects/SortedSet/SortedSetObjectImpl.cs | 8 ++++++-- libs/server/Resp/Objects/SortedSetCommands.cs | 17 ++++++++--------- .../Session/ObjectStore/SortedSetOps.cs | 18 ++++++++++++------ ...Tests.cs => RespBlockingCollectionTests.cs} | 4 ++-- 5 files changed, 30 insertions(+), 21 deletions(-) rename test/Garnet.test/{RespBlockingTests.cs => RespBlockingCollectionTests.cs} (99%) diff --git a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs index 81eff0e585..eb66f29a07 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs @@ -397,7 +397,7 @@ private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sort { case RespCommand.BZPOPMIN: case RespCommand.BZPOPMAX: - var element = sortedSetObj.Pop(command == RespCommand.BZPOPMAX); + var element = sortedSetObj.PopMinOrMax(command == RespCommand.BZPOPMAX); result = new CollectionItemResult(key, [element]); return true; @@ -410,7 +410,7 @@ private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sort for (int i = 0; i < popCount; i++) { - scoredItems[i] = sortedSetObj.Pop(!lowScoresFirst); + scoredItems[i] = sortedSetObj.PopMinOrMax(!lowScoresFirst); } result = new CollectionItemResult(key, scoredItems); diff --git a/libs/server/Objects/SortedSet/SortedSetObjectImpl.cs b/libs/server/Objects/SortedSet/SortedSetObjectImpl.cs index 03ef1aecf6..531f945552 100644 --- a/libs/server/Objects/SortedSet/SortedSetObjectImpl.cs +++ b/libs/server/Objects/SortedSet/SortedSetObjectImpl.cs @@ -6,7 +6,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Runtime.Intrinsics.X86; using System.Text; using Garnet.common; using Tsavorite.core; @@ -887,7 +886,12 @@ private void SortedSetRank(ref ObjectInput input, ref SpanByteAndMemory output, } } - public (double Score, byte[] Element) Pop(bool popMaxScoreElement = false) + /// + /// Removes and returns the element with the highest or lowest score from the sorted set. + /// + /// If true, pops the element with the highest score; otherwise, pops the element with the lowest score. + /// A tuple containing the score and the element as a byte array. + public (double Score, byte[] Element) PopMinOrMax(bool popMaxScoreElement = false) { if (sortedSet.Count == 0) return default; diff --git a/libs/server/Resp/Objects/SortedSetCommands.cs b/libs/server/Resp/Objects/SortedSetCommands.cs index 19f418b569..a96302f489 100644 --- a/libs/server/Resp/Objects/SortedSetCommands.cs +++ b/libs/server/Resp/Objects/SortedSetCommands.cs @@ -2,7 +2,6 @@ // Licensed under the MIT license. using System; -using System.Linq; using System.Text; using Garnet.common; using Tsavorite.core; @@ -1530,11 +1529,19 @@ private unsafe bool SortedSetUnionStore(ref TGarnetApi storageApi) /// private unsafe bool SortedSetBlockingPop(RespCommand command) { + if (storeWrapper.itemBroker == null) + throw new GarnetException("Object store is disabled"); + if (parseState.Count < 2) { return AbortWithWrongNumberOfArguments(command.ToString()); } + if (!parseState.TryGetDouble(parseState.Count - 1, out var timeout)) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT); + } + var keysBytes = new byte[parseState.Count - 1][]; for (var i = 0; i < keysBytes.Length; i++) @@ -1542,14 +1549,6 @@ private unsafe bool SortedSetBlockingPop(RespCommand command) keysBytes[i] = parseState.GetArgSliceByRef(i).SpanByte.ToByteArray(); } - if (!parseState.TryGetDouble(parseState.Count - 1, out var timeout)) - { - return AbortWithErrorMessage(CmdStrings.RESP_ERR_TIMEOUT_NOT_VALID_FLOAT); - } - - if (storeWrapper.itemBroker == null) - throw new GarnetException("Object store is disabled"); - var result = storeWrapper.itemBroker.GetCollectionItemAsync(command, keysBytes, this, timeout).Result; if (!result.Found) diff --git a/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs b/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs index a514a3a521..8ac9c4c3cf 100644 --- a/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs +++ b/libs/server/Storage/Session/ObjectStore/SortedSetOps.cs @@ -612,7 +612,6 @@ public GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySp } count = pairs?.Count ?? 0; - var destinationKeyBty = destinationKey.ToArray(); if (count > 0) { SortedSetObject newSetObject = new(); @@ -620,7 +619,10 @@ public GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySp { newSetObject.Add(element, score); } - _ = SET(destinationKeyBty, newSetObject, ref objectContext); + + var destinationKeyBytes = destinationKey.ToArray(); + _ = SET(destinationKeyBytes, newSetObject, ref objectContext); + itemBroker.HandleCollectionUpdate(destinationKeyBytes); } else { @@ -628,8 +630,6 @@ public GarnetStatus SortedSetDifferenceStore(ArgSlice destinationKey, ReadOnlySp ref lockableContext, ref objectContext); } - itemBroker.HandleCollectionUpdate(destinationKeyBty); - return status; } finally @@ -1072,7 +1072,10 @@ public GarnetStatus SortedSetUnionStore(ArgSlice destinationKey, ReadOnlySpan { using var lcr = TestUtils.CreateRequest(); - Task.Delay(TimeSpan.FromSeconds(5)).Wait(); + Task.Delay(TimeSpan.FromSeconds(2)).Wait(); return lcr.SendCommand($"RPUSH {key} {string.Join(" ", values)}"); }); From f0630930f58fb0341712c3eddcb584f08e2c72c9 Mon Sep 17 00:00:00 2001 From: Vijay-Nirmal Date: Sun, 12 Jan 2025 01:33:43 +0530 Subject: [PATCH 4/4] Reusing Items property --- .../ItemBroker/CollectionItemBroker.cs | 11 ++++++---- .../ItemBroker/CollectionItemResult.cs | 21 +++++++++++++++---- libs/server/Resp/Objects/SortedSetCommands.cs | 14 ++++++------- 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs index eb66f29a07..f781d26974 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemBroker.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemBroker.cs @@ -398,7 +398,7 @@ private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sort case RespCommand.BZPOPMIN: case RespCommand.BZPOPMAX: var element = sortedSetObj.PopMinOrMax(command == RespCommand.BZPOPMAX); - result = new CollectionItemResult(key, [element]); + result = new CollectionItemResult(key, element.Score, element.Element); return true; case RespCommand.BZMPOP: @@ -406,14 +406,17 @@ private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sort var popCount = *(int*)cmdArgs[1].ptr; popCount = Math.Min(popCount, sortedSetObj.Dictionary.Count); - var scoredItems = new (double Score, byte[] Element)[popCount]; + var scores = new double[popCount]; + var items = new byte[popCount][]; for (int i = 0; i < popCount; i++) { - scoredItems[i] = sortedSetObj.PopMinOrMax(!lowScoresFirst); + var popResult = sortedSetObj.PopMinOrMax(!lowScoresFirst); + scores[i] = popResult.Score; + items[i] = popResult.Element; } - result = new CollectionItemResult(key, scoredItems); + result = new CollectionItemResult(key, scores, items); return true; default: diff --git a/libs/server/Objects/ItemBroker/CollectionItemResult.cs b/libs/server/Objects/ItemBroker/CollectionItemResult.cs index d92f01ba2c..a971cc1232 100644 --- a/libs/server/Objects/ItemBroker/CollectionItemResult.cs +++ b/libs/server/Objects/ItemBroker/CollectionItemResult.cs @@ -20,10 +20,18 @@ public CollectionItemResult(byte[] key, byte[][] items) Items = items; } - public CollectionItemResult(byte[] key, (double Score, byte[] Element)[] scoredItems) + public CollectionItemResult(byte[] key, double score, byte[] item) { Key = key; - ScoredItems = scoredItems; + Score = score; + Item = item; + } + + public CollectionItemResult(byte[] key, double[] scores, byte[][] items) + { + Key = key; + Scores = scores; + Items = items; } /// @@ -41,15 +49,20 @@ public CollectionItemResult(byte[] key, (double Score, byte[] Element)[] scoredI /// internal byte[] Item { get; } + /// + /// Score associated with the item retrieved from the collection + /// + internal double Score { get; } + /// /// Item retrieved from collection /// internal byte[][] Items { get; } /// - /// Scored items retrieved from collection, where each item has an associated score. + /// Scores associated with the items retrieved from the collection /// - internal (double Score, byte[] Element)[] ScoredItems { get; } + internal double[] Scores { get; } /// /// Instance of empty result diff --git a/libs/server/Resp/Objects/SortedSetCommands.cs b/libs/server/Resp/Objects/SortedSetCommands.cs index a96302f489..4402b19c33 100644 --- a/libs/server/Resp/Objects/SortedSetCommands.cs +++ b/libs/server/Resp/Objects/SortedSetCommands.cs @@ -1564,11 +1564,10 @@ private unsafe bool SortedSetBlockingPop(RespCommand command) while (!RespWriteUtils.WriteBulkString(result.Key, ref dcurr, dend)) SendAndReset(); - var memberAndScore = result.ScoredItems; - while (!RespWriteUtils.WriteBulkString(memberAndScore[0].Element, ref dcurr, dend)) + while (!RespWriteUtils.WriteBulkString(result.Item, ref dcurr, dend)) SendAndReset(); - while (!RespWriteUtils.TryWriteDoubleBulkString(memberAndScore[0].Score, ref dcurr, dend)) + while (!RespWriteUtils.TryWriteDoubleBulkString(result.Score, ref dcurr, dend)) SendAndReset(); } @@ -1668,17 +1667,16 @@ private unsafe bool SortedSetBlockingMPop() while (!RespWriteUtils.WriteBulkString(result.Key, ref dcurr, dend)) SendAndReset(); - var pairs = result.ScoredItems; - while (!RespWriteUtils.WriteArrayLength(pairs.Length, ref dcurr, dend)) + while (!RespWriteUtils.WriteArrayLength(result.Items.Length, ref dcurr, dend)) SendAndReset(); - for (var i = 0; i < pairs.Length; i += 2) + for (var i = 0; i < result.Items.Length; i += 2) { while (!RespWriteUtils.WriteArrayLength(2, ref dcurr, dend)) SendAndReset(); - while (!RespWriteUtils.WriteBulkString(pairs[i].Element, ref dcurr, dend)) + while (!RespWriteUtils.WriteBulkString(result.Items[i], ref dcurr, dend)) SendAndReset(); - while (!RespWriteUtils.TryWriteDoubleBulkString(pairs[i].Score, ref dcurr, dend)) + while (!RespWriteUtils.TryWriteDoubleBulkString(result.Scores[i], ref dcurr, dend)) SendAndReset(); }