-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Query iterator method Continuation #89
Changes from 1 commit
29dd6a2
3bb279b
4d71943
0bc8f0c
f5dd636
3602fe1
f1009b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
using System.Runtime.InteropServices; | ||
using System.Text.Json; | ||
using Google.Protobuf.Collections; | ||
using KeyValuePair = Milvus.Client.Grpc.KeyValuePair; | ||
|
||
namespace Milvus.Client; | ||
|
||
|
@@ -143,7 +144,7 @@ public async Task<MutationResult> DeleteAsync( | |
{ | ||
Verify.NotNullOrWhiteSpace(expression); | ||
|
||
var request = new DeleteRequest | ||
DeleteRequest request = new DeleteRequest | ||
{ | ||
CollectionName = Name, | ||
Expr = expression, | ||
|
@@ -390,7 +391,7 @@ public Task<FlushResult> FlushAsync(CancellationToken cancellationToken = defaul | |
public async Task<IReadOnlyList<PersistentSegmentInfo>> GetPersistentSegmentInfosAsync( | ||
CancellationToken cancellationToken = default) | ||
{ | ||
var request = new GetPersistentSegmentInfoRequest { CollectionName = Name }; | ||
GetPersistentSegmentInfoRequest request = new GetPersistentSegmentInfoRequest { CollectionName = Name }; | ||
|
||
GetPersistentSegmentInfoResponse response = await _client.InvokeAsync( | ||
_client.GrpcClient.GetPersistentSegmentInfoAsync, | ||
|
@@ -429,7 +430,7 @@ public async Task<IReadOnlyList<FieldData>> QueryAsync( | |
|
||
PopulateQueryRequestFromParameters(request, parameters); | ||
|
||
var response = await _client.InvokeAsync( | ||
QueryResults? response = await _client.InvokeAsync( | ||
_client.GrpcClient.QueryAsync, | ||
request, | ||
static r => r.Status, | ||
|
@@ -460,22 +461,22 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync( | |
throw new MilvusException("Not support offset when searching iteration"); | ||
} | ||
|
||
var describeResponse = await _client.InvokeAsync( | ||
DescribeCollectionResponse? describeResponse = await _client.InvokeAsync( | ||
_client.GrpcClient.DescribeCollectionAsync, | ||
new DescribeCollectionRequest { CollectionName = Name }, | ||
r => r.Status, | ||
cancellationToken) | ||
.ConfigureAwait(false); | ||
|
||
var pkField = describeResponse.Schema.Fields.FirstOrDefault(x => x.IsPrimaryKey); | ||
Grpc.FieldSchema? pkField = describeResponse.Schema.Fields.FirstOrDefault(x => x.IsPrimaryKey); | ||
if (pkField == null) | ||
{ | ||
throw new MilvusException("Schema must contain pk field"); | ||
} | ||
|
||
var isUserRequestPkField = parameters?.OutputFieldsInternal?.Contains(pkField.Name) ?? false; | ||
var userExpression = expression; | ||
var userLimit = parameters?.Limit ?? int.MaxValue; | ||
bool isUserRequestPkField = parameters?.OutputFieldsInternal?.Contains(pkField.Name) ?? false; | ||
string? userExpression = expression; | ||
int userLimit = parameters?.Limit ?? int.MaxValue; | ||
|
||
QueryRequest request = new() | ||
{ | ||
|
@@ -486,8 +487,10 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync( | |
{userExpression: not null} => userExpression, | ||
// If user expression is null and pk field is string | ||
{pkField.DataType: DataType.VarChar} => $"{pkField.Name} != ''", | ||
// If user expression is null and pk field is not string | ||
_ => $"{pkField.Name} < {long.MaxValue}", | ||
// If user expression is null and pk field is int | ||
{pkField.DataType: DataType.Int8 or DataType.Int16 or DataType.Int32 or DataType.Int64} => $"{pkField.Name} < {long.MaxValue}", | ||
// If user expression is null and pk field is not string and not int | ||
_ => throw new MilvusException("Unsupported data type for primary key field") | ||
} | ||
}; | ||
|
||
|
@@ -497,17 +500,18 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync( | |
if (!isUserRequestPkField) request.OutputFields.Add(pkField.Name); | ||
|
||
// Replace parameters required for iterator | ||
string iterationBatchSize = Math.Min(batchSize, userLimit).ToString(CultureInfo.InvariantCulture); | ||
ReplaceKeyValueItems(request.QueryParams, | ||
new Grpc.KeyValuePair {Key = Constants.Iterator, Value = "True"}, | ||
new Grpc.KeyValuePair {Key = Constants.ReduceStopForBest, Value = "True"}, | ||
new Grpc.KeyValuePair {Key = Constants.BatchSize, Value = batchSize.ToString(CultureInfo.InvariantCulture)}, | ||
new Grpc.KeyValuePair {Key = Constants.BatchSize, Value = iterationBatchSize}, | ||
new Grpc.KeyValuePair {Key = Constants.Offset, Value = 0.ToString(CultureInfo.InvariantCulture)}, | ||
new Grpc.KeyValuePair {Key = Constants.Limit, Value = Math.Min(batchSize, userLimit).ToString(CultureInfo.InvariantCulture)}); | ||
new Grpc.KeyValuePair {Key = Constants.Limit, Value = iterationBatchSize}); | ||
|
||
var processedItemsCount = 0; | ||
int processedItemsCount = 0; | ||
while (!cancellationToken.IsCancellationRequested) | ||
{ | ||
var response = await _client.InvokeAsync( | ||
QueryResults? response = await _client.InvokeAsync( | ||
_client.GrpcClient.QueryAsync, | ||
request, | ||
static r => r.Status, | ||
|
@@ -516,16 +520,22 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync( | |
|
||
object? pkLastValue; | ||
int processedDuringIterationCount; | ||
var pkFieldsData = response.FieldsData.Single(x => x.FieldId == pkField.FieldID); | ||
if (pkField.DataType == DataType.VarChar) | ||
Grpc.FieldData? pkFieldsData = response.FieldsData.Single(x => x.FieldId == pkField.FieldID); | ||
switch (pkField.DataType) | ||
{ | ||
pkLastValue = pkFieldsData.Scalars.StringData.Data.LastOrDefault(); | ||
processedDuringIterationCount = pkFieldsData.Scalars.StringData.Data.Count; | ||
} | ||
else | ||
{ | ||
pkLastValue = pkFieldsData.Scalars.IntData.Data.LastOrDefault(); | ||
processedDuringIterationCount = pkFieldsData.Scalars.IntData.Data.Count; | ||
case DataType.VarChar: | ||
pkLastValue = pkFieldsData.Scalars.StringData.Data.LastOrDefault(); | ||
processedDuringIterationCount = pkFieldsData.Scalars.StringData.Data.Count; | ||
break; | ||
case DataType.Int8: | ||
case DataType.Int16: | ||
case DataType.Int32: | ||
case DataType.Int64: | ||
pkLastValue = pkFieldsData.Scalars.IntData.Data.LastOrDefault(); | ||
processedDuringIterationCount = pkFieldsData.Scalars.IntData.Data.Count; | ||
break; | ||
default: | ||
throw new MilvusException("Unsupported data type for primary key field"); | ||
} | ||
|
||
// If there are no more items to process, we should break the loop | ||
|
@@ -540,7 +550,7 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync( | |
yield return ProcessReturnedFieldData(response.FieldsData); | ||
|
||
processedItemsCount += processedDuringIterationCount; | ||
var leftItemsCount = userLimit - processedItemsCount; | ||
int leftItemsCount = userLimit - processedItemsCount; | ||
|
||
// If user limit is reached, we should break the loop | ||
if(leftItemsCount <= 0) yield break; | ||
|
@@ -554,7 +564,7 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync( | |
Value = Math.Min(batchSize, leftItemsCount).ToString(CultureInfo.InvariantCulture) | ||
}); | ||
|
||
var nextExpression = pkField.DataType == DataType.VarChar | ||
string nextExpression = pkField.DataType == DataType.VarChar | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the concatenation below ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
? $"{pkField.Name} > '{pkLastValue}'" | ||
: $"{pkField.Name} > {pkLastValue}"; | ||
|
||
|
@@ -577,7 +587,7 @@ public async IAsyncEnumerable<IReadOnlyList<FieldData>> QueryWithIteratorAsync( | |
public async Task<IReadOnlyList<QuerySegmentInfoResult>> GetQuerySegmentInfoAsync( | ||
CancellationToken cancellationToken = default) | ||
{ | ||
var request = new GetQuerySegmentInfoRequest { CollectionName = Name }; | ||
GetQuerySegmentInfoRequest request = new GetQuerySegmentInfoRequest { CollectionName = Name }; | ||
|
||
GetQuerySegmentInfoResponse response = | ||
await _client.InvokeAsync(_client.GrpcClient.GetQuerySegmentInfoAsync, request, static r => r.Status, | ||
|
@@ -780,14 +790,14 @@ ulong CalculateGuaranteeTimestamp( | |
|
||
private static void ReplaceKeyValueItems(RepeatedField<Grpc.KeyValuePair> collection, params Grpc.KeyValuePair[] pairs) | ||
{ | ||
var obsoleteParameterKeys = pairs.Select(x => x.Key).Distinct().ToArray(); | ||
var obsoleteParameters = collection.Where(x => obsoleteParameterKeys.Contains(x.Key)).ToArray(); | ||
foreach (var field in obsoleteParameters) | ||
string[] obsoleteParameterKeys = pairs.Select(x => x.Key).Distinct().ToArray(); | ||
KeyValuePair[] obsoleteParameters = collection.Where(x => obsoleteParameterKeys.Contains(x.Key)).ToArray(); | ||
foreach (KeyValuePair field in obsoleteParameters) | ||
{ | ||
collection.Remove(field); | ||
} | ||
|
||
foreach (var pair in pairs) | ||
foreach (KeyValuePair pair in pairs) | ||
{ | ||
collection.Add(pair); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do the same switch here as above, with an exception for unsupported types etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added