diff --git a/Milvus.Client.Tests/DataTests.cs b/Milvus.Client.Tests/DataTests.cs index 6460fad..e07db78 100644 --- a/Milvus.Client.Tests/DataTests.cs +++ b/Milvus.Client.Tests/DataTests.cs @@ -38,6 +38,56 @@ public async Task Insert_Drop() Assert.Empty(result.Data); } + [Fact] + public async Task Upsert() + { + await Collection.InsertAsync( + [ + FieldData.Create("id", new[] { 1L }), + FieldData.CreateFloatVector("float_vector", new ReadOnlyMemory[] { new[] { 20f, 30f } }) + ]); + + MutationResult upsertResult = await Collection.UpsertAsync( + [ + FieldData.Create("id", new[] { 1L, 2L }), + FieldData.CreateFloatVector( + "float_vector", + new ReadOnlyMemory[] { new[] { 1f, 2f }, new[] { 3f, 4f } }) + ]); + + Assert.Collection(upsertResult.Ids.LongIds!, + i => Assert.Equal(1, i), + i => Assert.Equal(2, i)); + // TODO: Weirdly these all seem to contain 2, though we're supposed to have inserted one row and updated one + // Assert.Equal(0, upsertResult.DeleteCount); + // Assert.Equal(1, upsertResult.InsertCount); + // Assert.Equal(1, upsertResult.UpsertCount); + + IReadOnlyList results = await Collection.QueryAsync( + "id in [1,2]", + new() + { + OutputFields = { "float_vector" }, + ConsistencyLevel = ConsistencyLevel.Strong + }); + + Assert.Collection( + results.OrderBy(r => r.FieldName), + r => + { + Assert.Equal("float_vector", r.FieldName); + Assert.Collection( + ((FloatVectorFieldData)r).Data, + v => Assert.Equal(new[] { 1f, 2f }, v), + v => Assert.Equal(new[] { 3f, 4f }, v)); + }, + r => + { + Assert.Equal("id", r.FieldName); + Assert.Equivalent(new[] { 1L, 2L }, ((FieldData)r).Data); + }); + } + [Fact] public async Task Timestamp_conversion() { diff --git a/Milvus.Client/MilvusCollection.Entity.cs b/Milvus.Client/MilvusCollection.Entity.cs index 2084cd7..5e897a0 100644 --- a/Milvus.Client/MilvusCollection.Entity.cs +++ b/Milvus.Client/MilvusCollection.Entity.cs @@ -36,10 +36,60 @@ public async Task InsertAsync( request.PartitionName = partitionName; } + PopulateData(data, request.FieldsData); + + request.NumRows = (uint)data[0].RowCount; + + Grpc.MutationResult response = + await _client.InvokeAsync(_client.GrpcClient.InsertAsync, request, static r => r.Status, cancellationToken) + .ConfigureAwait(false); + + _client.CollectionLastMutationTimestamps[Name] = response.Timestamp; + + return new MutationResult(response); + } + + /// + /// Upserts rows of data into a collection. + /// + /// The field data to upsert; each field contains a list of row values. + /// An optional name of a partition to upsert into. + /// + /// The token to monitor for cancellation requests. The default value is . + /// + public async Task UpsertAsync( + IReadOnlyList data, + string? partitionName = null, + CancellationToken cancellationToken = default) + { + Verify.NotNull(data); + + UpsertRequest request = new() { CollectionName = Name }; + + if (partitionName is not null) + { + request.PartitionName = partitionName; + } + + PopulateData(data, request.FieldsData); + + request.NumRows = (uint)data[0].RowCount; + + Grpc.MutationResult response = + await _client.InvokeAsync(_client.GrpcClient.UpsertAsync, request, static r => r.Status, cancellationToken) + .ConfigureAwait(false); + + _client.CollectionLastMutationTimestamps[Name] = response.Timestamp; + + return new MutationResult(response); + } + + private static void PopulateData(IReadOnlyList fieldsData, RepeatedField grpcFieldsData) + { Dictionary?[]? dynamicFieldsData = null; - long count = data[0].RowCount; - foreach (FieldData field in data) + long count = fieldsData[0].RowCount; + foreach (FieldData field in fieldsData) { if (field.RowCount != count) { @@ -60,7 +110,7 @@ public async Task InsertAsync( } else { - request.FieldsData.Add(field.ToGrpcFieldData()); + grpcFieldsData.Add(field.ToGrpcFieldData()); } } @@ -73,18 +123,8 @@ public async Task InsertAsync( } FieldData metaFieldData = new FieldData(encodedJsonStrings, MilvusDataType.Json, isDynamic: true); - request.FieldsData.Add(metaFieldData.ToGrpcFieldData()); + grpcFieldsData.Add(metaFieldData.ToGrpcFieldData()); } - - request.NumRows = (uint)count; - - Grpc.MutationResult response = - await _client.InvokeAsync(_client.GrpcClient.InsertAsync, request, static r => r.Status, cancellationToken) - .ConfigureAwait(false); - - _client.CollectionLastMutationTimestamps[Name] = response.Timestamp; - - return new MutationResult(response); } ///