From be24c657306361115d5d47573a862d58df86b272 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 8 Oct 2024 14:16:23 +0200 Subject: [PATCH 01/10] Hacking new serialization with Steve at techorama --- ...kRequestCreationForDataStreamBenchmarks.cs | 22 ++--- ...estCreationWithFixedIndexNameBenchmarks.cs | 22 ++--- ...reationWithTemplatedIndexNameBenchmarks.cs | 22 ++--- .../Program.cs | 2 +- build/scripts/CommandLine.fs | 10 +- global.json | 2 +- .../DataStreams/DataStreamChannel.cs | 7 +- ...y.cs => ElasticsearchChannelBase.Bytes.cs} | 94 ++++++++++++++----- .../ElasticsearchChannelBase.Serialization.cs | 38 ++++++++ .../ElasticsearchChannelBase.cs | 21 ++--- .../Indices/IndexChannel.cs | 13 ++- .../Serialization/BulkOperationHeader.cs | 7 ++ 12 files changed, 180 insertions(+), 80 deletions(-) rename src/Elastic.Ingest.Elasticsearch/{Serialization/BulkRequestDataFactory.cs => ElasticsearchChannelBase.Bytes.cs} (74%) create mode 100644 src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs index c44a74a..27f4adc 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs @@ -42,15 +42,15 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader); - var requestData = new RequestData( - POST, "/_bulk", PostData.ReadOnlyMemory(bytes), - _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() - ); - await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); - } + // [Benchmark(Baseline = true)] + // public async Task WriteToStreamAsync() + // { + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader); + // var requestData = new RequestData( + // POST, "/_bulk", PostData.ReadOnlyMemory(bytes), + // _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() + // ); + // await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); + // } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs index 0d149d4..c3b1049 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs @@ -41,15 +41,15 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true)); - var requestData = new RequestData( - POST, "/_bulk", PostData.ReadOnlyMemory(bytes), - _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() - ); - await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); - } + // [Benchmark(Baseline = true)] + // public async Task WriteToStreamAsync() + // { + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true)); + // var requestData = new RequestData( + // POST, "/_bulk", PostData.ReadOnlyMemory(bytes), + // _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() + // ); + // await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); + // } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs index cb9b80f..cb778b2 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs @@ -40,15 +40,15 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task DynamicIndexName_WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false)); - var requestData = new RequestData( - POST, "/_bulk", PostData.ReadOnlyMemory(bytes), - _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() - ); - await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); - } + // [Benchmark(Baseline = true)] + // public async Task DynamicIndexName_WriteToStreamAsync() + // { + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false)); + // var requestData = new RequestData( + // POST, "/_bulk", PostData.ReadOnlyMemory(bytes), + // _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() + // ); + // await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); + // } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs index 2564764..94f10b7 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs @@ -24,7 +24,7 @@ var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks(); bm.Setup(); -await bm.WriteToStreamAsync(); +//await bm.WriteToStreamAsync(); var length = bm.MemoryStream.Length; diff --git a/build/scripts/CommandLine.fs b/build/scripts/CommandLine.fs index 12a4cc5..545f2b0 100644 --- a/build/scripts/CommandLine.fs +++ b/build/scripts/CommandLine.fs @@ -29,11 +29,11 @@ with interface IArgParserTemplate with member this.Usage = match this with - | Clean _ -> "clean known output locations" - | Build _ -> "Run build" - | Test _ -> "Runs build then tests" - | Release _ -> "runs build, tests, and create and validates the packages shy of publishing them" - | Publish _ -> "Runs the full release" + | Clean -> "clean known output locations" + | Build -> "Run build" + | Test -> "Runs build then tests" + | Release -> "runs build, tests, and create and validates the packages shy of publishing them" + | Publish -> "Runs the full release" | SingleTarget _ -> "Runs the provided sub command without running their dependencies" | Token _ -> "Token to be used to authenticate with github" diff --git a/global.json b/global.json index c317b00..789bff3 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "6.0.302", + "version": "8.0.100", "rollForward": "latestFeature", "allowPrerelease": false } diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index 44cd75b..d04d59f 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -29,8 +29,11 @@ public DataStreamChannel(DataStreamChannelOptions options, ICollection - protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event) => _fixedHeader; + /// + protected override IndexOp GetIndexOp(TEvent @event) => IndexOp.CreateNoParams; + + /// + protected override void MutateHeader(ref readonly BulkHeader header) { } /// protected override string TemplateName => Options.DataStream.GetTemplateName(); diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs similarity index 74% rename from src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs rename to src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs index 8f00950..4db13a1 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -2,36 +2,67 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -#if NETSTANDARD2_1_OR_GREATER +using System; using System.Buffers; -#else using System.Collections.Generic; -#endif -using System; using System.IO; +using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Elastic.Channels; +using Elastic.Channels.Diagnostics; +using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Ingest.Transport; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; -namespace Elastic.Ingest.Elasticsearch.Serialization; +namespace Elastic.Ingest.Elasticsearch; + +/// TODO +public enum IndexOp +{ + /// + Index, + /// + IndexNoParams, + /// + Create, + /// + CreateNoParams, + /// + Delete, + /// + Update, +} + +/// TODO +public readonly struct BulkHeader +{ + +} /// -/// Provides static factory methods from producing request data for bulk requests. +/// An abstract base class for both and +/// Coordinates most of the sending to- and bootstrapping of Elasticsearch /// -public static class BulkRequestDataFactory +public abstract partial class ElasticsearchChannelBase + : TransportChannelBase + where TChannelOptions : ElasticsearchChannelOptionsBase { + #if NETSTANDARD2_1_OR_GREATER /// /// Get the NDJSON request body bytes for a page of events. /// - /// The type for the event being ingested. /// A page of events. /// The for the channel where the request will be written. /// A function which takes an instance of and produces the operation header containing the action and optional meta data. /// A of representing the entire request body in NDJSON format. - public static ReadOnlyMemory GetBytes(ArraySegment page, + public ReadOnlyMemory GetBytes(ArraySegment page, ElasticsearchChannelOptionsBase options, Func createHeaderFactory) { // ArrayBufferWriter inserts comma's when serializing multiple times @@ -71,19 +102,25 @@ public static ReadOnlyMemory GetBytes(ArraySegment page, } #endif + /// TODO + protected abstract IndexOp GetIndexOp(TEvent @event); + + /// + /// + /// + /// + protected abstract void MutateHeader(ref readonly BulkHeader header); + /// /// Asynchronously write the NDJSON request body for a page of events to . /// - /// The type for the event being ingested. /// A page of events. /// The target for the request. /// The for the channel where the request will be written. - /// A function which takes an instance of and produces the operation header containing the action and optional meta data. /// The cancellation token to cancel operation. /// - public static async Task WriteBufferToStreamAsync(ArraySegment page, Stream stream, - ElasticsearchChannelOptionsBase options, Func createHeaderFactory, - CancellationToken ctx = default) + public async Task WriteBufferToStreamAsync( + ArraySegment page, Stream stream, ElasticsearchChannelOptionsBase options, CancellationToken ctx = default) { #if NETSTANDARD2_1_OR_GREATER var items = page; @@ -99,12 +136,27 @@ public static async Task WriteBufferToStreamAsync(ArraySegment p var @event = items[i]; if (@event == null) continue; - var indexHeader = createHeaderFactory(@event); - await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), SerializerOptions, ctx) - .ConfigureAwait(false); + var op = GetIndexOp(@event); + switch (op) + { + case IndexOp.IndexNoParams: + await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); + break; + case IndexOp.Index: + case IndexOp.Create: + case IndexOp.Delete: + case IndexOp.Update: + var header = new BulkHeader(); + MutateHeader(ref header); + await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false); + break; + + + } + await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); - if (indexHeader is UpdateOperation) + if (op == IndexOp.Update) await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false); if (options.EventWriter?.WriteToStreamAsync != null) @@ -113,7 +165,7 @@ await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx) .ConfigureAwait(false); - if (indexHeader is UpdateOperation) + if (op == IndexOp.Update) await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false); await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); @@ -123,12 +175,11 @@ await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx) /// /// Create the bulk operation header with the appropriate action and meta data for a bulk request targeting an index. /// - /// The type for the event being ingested. /// The for which the header will be produced. /// The for the channel. /// Control whether the index name is included in the meta data for the operation. /// A instance. - public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEvent @event, IndexChannelOptions options, + public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEvent @event, IndexChannelOptions options, bool skipIndexName = false) { var indexTime = options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; @@ -161,4 +212,3 @@ public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEve : skipIndexName ? new CreateOperation() : new CreateOperation { Index = index }; } } - diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs new file mode 100644 index 0000000..6358864 --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels; +using Elastic.Channels.Diagnostics; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Ingest.Transport; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; + +namespace Elastic.Ingest.Elasticsearch; + +/// +/// An abstract base class for both and +/// Coordinates most of the sending to- and bootstrapping of Elasticsearch +/// +public abstract partial class ElasticsearchChannelBase + : TransportChannelBase + where TChannelOptions : ElasticsearchChannelOptionsBase +{ + private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader header, JsonSerializerOptions serializerOptions, CancellationToken ctx) => + throw new NotImplementedException(); + + private Task SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) => + throw new NotImplementedException(); +} diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs index d391a0f..8c6d39e 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs @@ -19,6 +19,8 @@ namespace Elastic.Ingest.Elasticsearch; + + /// /// An abstract base class for both and /// Coordinates most of the sending to- and bootstrapping of Elasticsearch @@ -71,12 +73,12 @@ protected override Task ExportAsync(ITransport transport, ArraySeg #if NETSTANDARD2_1 // Option is obsolete to prevent external users to set it. #pragma warning disable CS0618 - if (Options.UseReadOnlyMemory) -#pragma warning restore CS0618 - { - var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader); - return transport.RequestAsync(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx); - } +// if (Options.UseReadOnlyMemory) +// #pragma warning restore CS0618 +// { +// var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader); +// return transport.RequestAsync(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx); +// } #endif #pragma warning disable IDE0022 // Use expression body for method return transport.RequestAsync(HttpMethod.POST, BulkUrl, @@ -85,16 +87,11 @@ protected override Task ExportAsync(ITransport transport, ArraySeg { /* NOT USED */ }, - async (b, stream, ctx) => { await BulkRequestDataFactory.WriteBufferToStreamAsync(b, stream, Options, CreateBulkOperationHeader, ctx).ConfigureAwait(false); }) + async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, Options, ctx).ConfigureAwait(false); }) , RequestParams, ctx); #pragma warning restore IDE0022 // Use expression body for method } - /// - /// Asks implementations to create a based on the being exported. - /// - protected abstract BulkOperationHeader CreateBulkOperationHeader(TEvent @event); - /// protected class HeadIndexTemplateResponse : ElasticsearchResponse { } diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index 0e471ed..a8d01ea 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -15,7 +15,7 @@ namespace Elastic.Ingest.Elasticsearch.Indices; /// public class IndexChannel : ElasticsearchChannelBase> { - private readonly bool _skipIndexNameOnOperations = false; + //private readonly bool _skipIndexNameOnOperations = false; private readonly string _url; /// @@ -31,7 +31,7 @@ public IndexChannel(IndexChannelOptions options, ICollection options, ICollection protected override string BulkUrl => _url; - /// - protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event) => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(@event, Options, _skipIndexNameOnOperations); + // TODO implement + + /// + protected override IndexOp GetIndexOp(TEvent @event) => IndexOp.IndexNoParams; + + /// + protected override void MutateHeader(ref readonly BulkHeader header) => throw new NotImplementedException(); /// protected override string TemplateName { get; } diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs index b5dd325..b66ed31 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs @@ -8,6 +8,13 @@ namespace Elastic.Ingest.Elasticsearch.Serialization; +/// +/// +/// +public struct BulkOperationHeader2 +{ +} + /// Represents the _bulk operation meta header public abstract class BulkOperationHeader { From 7d2f9be7a656bea2601f3444dcacf0c5921c569c Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Tue, 8 Oct 2024 14:44:48 +0200 Subject: [PATCH 02/10] Support .NET 8.0 and improve serialization methods Updated `ConsumeOutboundEventsAsync` in `BufferedChannelBase.cs` to conditionally cancel `_exitCancelSource` using `CancelAsync` for .NET 8.0 or greater, and `Cancel` for other versions. Modified `ToString` method to return a nullable string (`string?`). Updated target frameworks in `Elastic.Channels.csproj` and `Elastic.Ingest.Elasticsearch.csproj` to include `net8.0`. Enhanced `WriteBufferToStreamAsync` in `ElasticsearchChannelBase.Bytes.cs` to handle `IndexOp.CreateNoParams` and use `SerializePlainIndexHeaderAsync` for serialization. Removed several `using` directives in `ElasticsearchChannelBase.Serialization.cs` and introduced static read-only spans for `PlainIndexBytesSpan` and `PlainCreateBytesSpan`. Added conditional compilation for serialization methods to use `ValueTask` and `ReadOnlySpan` for .NET 8.0 or greater, and `byte[]` for other versions. --- src/Elastic.Channels/BufferedChannelBase.cs | 6 ++- src/Elastic.Channels/Elastic.Channels.csproj | 2 +- .../Elastic.Ingest.Elasticsearch.csproj | 2 +- .../ElasticsearchChannelBase.Bytes.cs | 7 +-- .../ElasticsearchChannelBase.Serialization.cs | 49 +++++++++++++++---- 5 files changed, 50 insertions(+), 16 deletions(-) diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index ae2128c..1deaf41 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -310,7 +310,11 @@ private async Task ConsumeOutboundEventsAsync() } } await Task.WhenAll(_taskList).ConfigureAwait(false); +#if NET8_0_OR_GREATER + await _exitCancelSource.CancelAsync().ConfigureAwait(false); +#else _exitCancelSource.Cancel(); +#endif _callbacks.OutboundChannelExitedCallback?.Invoke(); } @@ -422,7 +426,7 @@ async Task AsyncSlowPath(IOutboundBuffer b) } /// > - public override string ToString() + public override string? ToString() { if (DiagnosticsListener == null) return base.ToString(); var sb = new StringBuilder(); diff --git a/src/Elastic.Channels/Elastic.Channels.csproj b/src/Elastic.Channels/Elastic.Channels.csproj index 4f0e70a..165103f 100644 --- a/src/Elastic.Channels/Elastic.Channels.csproj +++ b/src/Elastic.Channels/Elastic.Channels.csproj @@ -1,7 +1,7 @@ - netstandard2.0;netstandard2.1 + netstandard2.0;netstandard2.1;net8.0 Provides components to build a buffer-backed channel that flushes batches of data in a controlled (Max N || Max Duration) manner. elastic, channels, buffer latest diff --git a/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj b/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj index 6e537ed..730ec27 100644 --- a/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj +++ b/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj @@ -1,7 +1,7 @@ - netstandard2.0;netstandard2.1 + netstandard2.0;netstandard2.1;net8.0 Offers an easy to use ChannelWriter implementation to push data concurrently to Elasticsearch using Elastic.Transport elastic, channels, elasticsearch, ingest latest diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs index 4db13a1..779bba8 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -140,7 +140,10 @@ public async Task WriteBufferToStreamAsync( switch (op) { case IndexOp.IndexNoParams: - await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); + await ElasticsearchChannelBase.SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); + break; + case IndexOp.CreateNoParams: + await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false); break; case IndexOp.Index: case IndexOp.Create: @@ -150,8 +153,6 @@ public async Task WriteBufferToStreamAsync( MutateHeader(ref header); await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false); break; - - } await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs index 6358864..953a360 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -3,22 +3,14 @@ // See the LICENSE file in the project root for more information using System; -using System.Buffers; -using System.Collections.Generic; using System.IO; -using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Elastic.Channels; -using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Ingest.Transport; -using Elastic.Transport; -using Elastic.Transport.Products.Elasticsearch; -using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; namespace Elastic.Ingest.Elasticsearch; @@ -30,9 +22,46 @@ public abstract partial class ElasticsearchChannelBase : TransportChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { + private static ReadOnlySpan PlainIndexBytesSpan => """ + {"index":{}} + + """u8; + + private static ReadOnlySpan PlainCreateBytesSpan => """ + {"create":{}} + + """u8; + +#if NETSTANDARD + + private static byte[] PlainIndexBytes => PlainIndexBytesSpan.ToArray(); + + private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray(); +#endif + private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader header, JsonSerializerOptions serializerOptions, CancellationToken ctx) => throw new NotImplementedException(); - private Task SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) => - throw new NotImplementedException(); + +#if NET8_0_OR_GREATER + private static ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx = default) + { + stream.Write(PlainIndexBytesSpan); + return ValueTask.CompletedTask; + } +#else + private static async ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) => + await stream.WriteAsync(PlainIndexBytes, 0, PlainIndexBytes.Length, ctx).ConfigureAwait(false); +#endif + +#if NET8_0_OR_GREATER + private static ValueTask SerializePlainCreateHeaderAsync(Stream stream, CancellationToken ctx = default) + { + stream.Write(PlainCreateBytesSpan); + return ValueTask.CompletedTask; + } +#else + private static async ValueTask SerializePlainCreateHeaderAsync(Stream stream, CancellationToken ctx) => + await stream.WriteAsync(PlainCreateBytes, 0, PlainCreateBytes.Length, ctx).ConfigureAwait(false); +#endif } From 645374396c16f378fdf8a59dac31ed2e783b1a1d Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 8 Oct 2024 14:49:46 +0200 Subject: [PATCH 03/10] started on bulkheader options --- .../Benchmarks/BulkIngestionBenchmarks.cs | 4 -- .../DataStreams/DataStreamChannel.cs | 2 +- .../ElasticsearchChannelBase.Bytes.cs | 71 +++---------------- .../ElasticsearchChannelOptionsBase.cs | 14 ---- .../IElasticsearchEventWriter.cs | 10 --- .../Indices/IndexChannel.cs | 36 ++++++++-- .../Serialization/BulkOperationHeader.cs | 7 -- .../DataStreamIngestionTests.cs | 3 - .../Setup.cs | 3 - 9 files changed, 41 insertions(+), 109 deletions(-) diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs index 52fa94c..d8f71ed 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs @@ -26,9 +26,6 @@ public class BulkIngestionBenchmarks [ParamsAllValues] public bool DisableDiagnostics { get; set; } - [ParamsAllValues] - public bool UseReadOnlyMemory { get; set; } - [GlobalSetup] public void Setup() { @@ -47,7 +44,6 @@ public void Setup() OutboundBufferMaxSize = MaxExportSize }, DisableDiagnostics = DisableDiagnostics, - UseReadOnlyMemory = UseReadOnlyMemory, IndexFormat = "stock-data-v8", OutboundChannelExitedCallback = () => _waitHandle.Set(), #if DEBUG diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index d04d59f..1553eee 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -33,7 +33,7 @@ public DataStreamChannel(DataStreamChannelOptions options, ICollection IndexOp.CreateNoParams; /// - protected override void MutateHeader(ref readonly BulkHeader header) { } + protected override void MutateHeader(TEvent @event, ref BulkHeader header) { } /// protected override string TemplateName => Options.DataStream.GetTemplateName(); diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs index 779bba8..d54902b 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -6,18 +6,13 @@ using System.Buffers; using System.Collections.Generic; using System.IO; -using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; -using Elastic.Channels; -using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Ingest.Transport; -using Elastic.Transport; -using Elastic.Transport.Products.Elasticsearch; using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; namespace Elastic.Ingest.Elasticsearch; @@ -40,9 +35,14 @@ public enum IndexOp } /// TODO -public readonly struct BulkHeader +public struct BulkHeader { + /// TODO + public string Index { get; set; } + + /// TODO + public string? Id { get; set; } } /// @@ -50,66 +50,15 @@ public readonly struct BulkHeader /// Coordinates most of the sending to- and bootstrapping of Elasticsearch /// public abstract partial class ElasticsearchChannelBase - : TransportChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { - -#if NETSTANDARD2_1_OR_GREATER - /// - /// Get the NDJSON request body bytes for a page of events. - /// - /// A page of events. - /// The for the channel where the request will be written. - /// A function which takes an instance of and produces the operation header containing the action and optional meta data. - /// A of representing the entire request body in NDJSON format. - public ReadOnlyMemory GetBytes(ArraySegment page, - ElasticsearchChannelOptionsBase options, Func createHeaderFactory) - { - // ArrayBufferWriter inserts comma's when serializing multiple times - // Hence the multiple writer.Resets() as advised on this feature request - // https://github.com/dotnet/runtime/issues/82314 - var bufferWriter = new ArrayBufferWriter(); - using var writer = new Utf8JsonWriter(bufferWriter, WriterOptions); - foreach (var @event in page.AsSpan()) - { - var indexHeader = createHeaderFactory(@event); - JsonSerializer.Serialize(writer, indexHeader, indexHeader.GetType(), SerializerOptions); - bufferWriter.Write(LineFeed); - writer.Reset(); - - if (indexHeader is UpdateOperation) - { - bufferWriter.Write(DocUpdateHeaderStart); - writer.Reset(); - } - - if (options.EventWriter?.WriteToArrayBuffer != null) - options.EventWriter.WriteToArrayBuffer(bufferWriter, @event); - else - JsonSerializer.Serialize(writer, @event, SerializerOptions); - writer.Reset(); - - if (indexHeader is UpdateOperation) - { - bufferWriter.Write(DocUpdateHeaderEnd); - writer.Reset(); - } - - bufferWriter.Write(LineFeed); - writer.Reset(); - } - return bufferWriter.WrittenMemory; - } -#endif - /// TODO protected abstract IndexOp GetIndexOp(TEvent @event); - /// - /// - /// + /// + /// /// - protected abstract void MutateHeader(ref readonly BulkHeader header); + protected abstract void MutateHeader(TEvent @event, ref BulkHeader header); /// /// Asynchronously write the NDJSON request body for a page of events to . @@ -150,7 +99,7 @@ public async Task WriteBufferToStreamAsync( case IndexOp.Delete: case IndexOp.Update: var header = new BulkHeader(); - MutateHeader(ref header); + MutateHeader(@event, ref header); await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false); break; } diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs index 6cc6d67..758deed 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs @@ -22,18 +22,4 @@ protected ElasticsearchChannelOptionsBase(ITransport transport) : base(transport /// public IElasticsearchEventWriter? EventWriter { get; set; } - #if NETSTANDARD2_1_OR_GREATER - /// - /// Expert option, - /// This will eagerly serialize to and use . - /// If false (default) the channel will use to directly write to the stream. - /// - #else - /// - /// Expert option, only available in netstandard2.1+ compatible runtimes to evaluate serialization approaches - /// - #endif - [Obsolete("Temporary exposed expert option, used to evaluate two different approaches to serialization")] - public bool UseReadOnlyMemory { get; set; } - } diff --git a/src/Elastic.Ingest.Elasticsearch/IElasticsearchEventWriter.cs b/src/Elastic.Ingest.Elasticsearch/IElasticsearchEventWriter.cs index cf1eadc..5525605 100644 --- a/src/Elastic.Ingest.Elasticsearch/IElasticsearchEventWriter.cs +++ b/src/Elastic.Ingest.Elasticsearch/IElasticsearchEventWriter.cs @@ -16,16 +16,6 @@ namespace Elastic.Ingest.Elasticsearch; /// public interface IElasticsearchEventWriter { -#if NETSTANDARD2_1_OR_GREATER - /// - /// Provide a custom routine to write to an ArrayBufferWriter{T} - /// This implementation is only called if is true, defaults to false - /// Otherwise is called instead. - /// If `null` will fallback to its own internal implementation. - /// - Action, TEvent>? WriteToArrayBuffer { get; set; } -#endif - /// /// Provide a custom routine to write to a asynchronously /// If `null` will fallback to its internal implementation. diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index a8d01ea..bea243f 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -15,7 +15,7 @@ namespace Elastic.Ingest.Elasticsearch.Indices; /// public class IndexChannel : ElasticsearchChannelBase> { - //private readonly bool _skipIndexNameOnOperations = false; + private readonly bool _skipIndexName; private readonly string _url; /// @@ -31,7 +31,7 @@ public IndexChannel(IndexChannelOptions options, ICollection options, ICollection protected override string BulkUrl => _url; - // TODO implement - /// - protected override IndexOp GetIndexOp(TEvent @event) => IndexOp.IndexNoParams; + protected override IndexOp GetIndexOp(TEvent @event) + { + var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; + if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value); + + var index = _skipIndexName ? string.Empty : string.Format(Options.IndexFormat, indexTime); + + var id = Options.BulkOperationIdLookup?.Invoke(@event); + if (string.IsNullOrWhiteSpace(index) && string.IsNullOrWhiteSpace(id)) + return Options.OperationMode == OperationMode.Index + ? IndexOp.IndexNoParams + : IndexOp.CreateNoParams; + + return Options.OperationMode == OperationMode.Index + ? IndexOp.Index + : IndexOp.Create; + } /// - protected override void MutateHeader(ref readonly BulkHeader header) => throw new NotImplementedException(); + protected override void MutateHeader(TEvent @event, ref BulkHeader header) + { + var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; + if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value); + + var index = _skipIndexName ? string.Empty : string.Format(Options.IndexFormat, indexTime); + header.Index = index; + + var id = Options.BulkOperationIdLookup?.Invoke(@event); + header.Id = id; + } /// protected override string TemplateName { get; } diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs index b66ed31..b5dd325 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs @@ -8,13 +8,6 @@ namespace Elastic.Ingest.Elasticsearch.Serialization; -/// -/// -/// -public struct BulkOperationHeader2 -{ -} - /// Represents the _bulk operation meta header public abstract class BulkOperationHeader { diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs index c8aedc6..a1b16b8 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs @@ -28,9 +28,6 @@ public async Task EnsureDocumentsEndUpInDataStream() var slim = new CountdownEvent(1); var options = new DataStreamChannelOptions(Client.Transport) { -#pragma warning disable CS0618 - UseReadOnlyMemory = true, -#pragma warning restore CS0618 DataStream = targetDataStream, BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 } }; diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs index 1a9cb12..284a37e 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs @@ -58,9 +58,6 @@ public TestSession(DistributedTransport transport) ChannelOptions = new IndexChannelOptions(transport) { BufferOptions = BufferOptions, -#pragma warning disable CS0618 - UseReadOnlyMemory = true, -#pragma warning restore CS0618 ServerRejectionCallback = (_) => Interlocked.Increment(ref _rejections), ExportItemsAttemptCallback = (_, _) => Interlocked.Increment(ref _requests), ExportResponseCallback = (_, _) => Interlocked.Increment(ref _responses), From 9991c38c2b89c79f722050ca4ca421532533c73f Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 8 Oct 2024 15:08:30 +0200 Subject: [PATCH 04/10] remove old BulkOperationHeader.cs --- ...kRequestCreationForDataStreamBenchmarks.cs | 1 - .../DataStreams/DataStreamChannel.cs | 5 +- .../ElasticsearchChannelBase.Bytes.cs | 88 ++--------------- .../Indices/IndexChannel.cs | 10 +- .../Serialization/BulkHeader.cs | 25 +++++ .../Serialization/BulkOperationHeader.cs | 98 ------------------- .../Serialization/HeaderSerialization.cs | 22 +++++ 7 files changed, 63 insertions(+), 186 deletions(-) create mode 100644 src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs delete mode 100644 src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs create mode 100644 src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs index 27f4adc..4452534 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs @@ -18,7 +18,6 @@ public class BulkRequestCreationForDataStreamBenchmarks private ITransport? _transport; private TransportConfiguration? _transportConfiguration; private StockData[] _data = Array.Empty(); - private readonly BulkOperationHeader _bulkOperationHeader = new CreateOperation(); public Stream MemoryStream { get; } = new MemoryStream(); diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index 1553eee..ffa6315 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -13,7 +13,6 @@ namespace Elastic.Ingest.Elasticsearch.DataStreams; /// A channel to push messages to Elasticsearch data streams public class DataStreamChannel : ElasticsearchChannelBase> { - private readonly CreateOperation _fixedHeader; private readonly string _url; /// @@ -25,12 +24,10 @@ public DataStreamChannel(DataStreamChannelOptions options, ICollection - protected override IndexOp GetIndexOp(TEvent @event) => IndexOp.CreateNoParams; + protected override HeaderSerialization GetIndexOp(TEvent @event) => HeaderSerialization.CreateNoParams; /// protected override void MutateHeader(TEvent @event, ref BulkHeader header) { } diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs index d54902b..e7c386f 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -17,34 +17,6 @@ namespace Elastic.Ingest.Elasticsearch; -/// TODO -public enum IndexOp -{ - /// - Index, - /// - IndexNoParams, - /// - Create, - /// - CreateNoParams, - /// - Delete, - /// - Update, -} - -/// TODO -public struct BulkHeader -{ - - /// TODO - public string Index { get; set; } - - /// TODO - public string? Id { get; set; } -} - /// /// An abstract base class for both and /// Coordinates most of the sending to- and bootstrapping of Elasticsearch @@ -53,7 +25,7 @@ public abstract partial class ElasticsearchChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { /// TODO - protected abstract IndexOp GetIndexOp(TEvent @event); + protected abstract HeaderSerialization GetIndexOp(TEvent @event); /// /// @@ -88,16 +60,16 @@ public async Task WriteBufferToStreamAsync( var op = GetIndexOp(@event); switch (op) { - case IndexOp.IndexNoParams: - await ElasticsearchChannelBase.SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); + case HeaderSerialization.IndexNoParams: + await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); break; - case IndexOp.CreateNoParams: + case HeaderSerialization.CreateNoParams: await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false); break; - case IndexOp.Index: - case IndexOp.Create: - case IndexOp.Delete: - case IndexOp.Update: + case HeaderSerialization.Index: + case HeaderSerialization.Create: + case HeaderSerialization.Delete: + case HeaderSerialization.Update: var header = new BulkHeader(); MutateHeader(@event, ref header); await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false); @@ -106,7 +78,7 @@ public async Task WriteBufferToStreamAsync( await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); - if (op == IndexOp.Update) + if (op == HeaderSerialization.Update) await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false); if (options.EventWriter?.WriteToStreamAsync != null) @@ -115,50 +87,10 @@ public async Task WriteBufferToStreamAsync( await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx) .ConfigureAwait(false); - if (op == IndexOp.Update) + if (op == HeaderSerialization.Update) await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false); await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); } } - - /// - /// Create the bulk operation header with the appropriate action and meta data for a bulk request targeting an index. - /// - /// The for which the header will be produced. - /// The for the channel. - /// Control whether the index name is included in the meta data for the operation. - /// A instance. - public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEvent @event, IndexChannelOptions options, - bool skipIndexName = false) - { - var indexTime = options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; - if (options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(options.IndexOffset.Value); - - var index = skipIndexName ? string.Empty : string.Format(options.IndexFormat, indexTime); - - var id = options.BulkOperationIdLookup?.Invoke(@event); - - if (options.OperationMode == OperationMode.Index) - { - return skipIndexName - ? !string.IsNullOrWhiteSpace(id) ? new IndexOperation { Id = id } : new IndexOperation() - : !string.IsNullOrWhiteSpace(id) ? new IndexOperation { Index = index, Id = id } : new IndexOperation { Index = index }; - } - - if (options.OperationMode == OperationMode.Create) - { - return skipIndexName - ? !string.IsNullOrWhiteSpace(id) ? new CreateOperation { Id = id } : new CreateOperation() - : !string.IsNullOrWhiteSpace(id) ? new CreateOperation { Index = index, Id = id } : new CreateOperation { Index = index }; - } - - if (!string.IsNullOrWhiteSpace(id) && id != null && (options.BulkUpsertLookup?.Invoke(@event, id) ?? false)) - return skipIndexName ? new UpdateOperation { Id = id } : new UpdateOperation { Id = id, Index = index }; - - return - !string.IsNullOrWhiteSpace(id) - ? skipIndexName ? new IndexOperation { Id = id } : new IndexOperation { Index = index, Id = id } - : skipIndexName ? new CreateOperation() : new CreateOperation { Index = index }; - } } diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index bea243f..8c07ed1 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -42,7 +42,7 @@ public IndexChannel(IndexChannelOptions options, ICollection _url; /// - protected override IndexOp GetIndexOp(TEvent @event) + protected override HeaderSerialization GetIndexOp(TEvent @event) { var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value); @@ -52,12 +52,12 @@ protected override IndexOp GetIndexOp(TEvent @event) var id = Options.BulkOperationIdLookup?.Invoke(@event); if (string.IsNullOrWhiteSpace(index) && string.IsNullOrWhiteSpace(id)) return Options.OperationMode == OperationMode.Index - ? IndexOp.IndexNoParams - : IndexOp.CreateNoParams; + ? HeaderSerialization.IndexNoParams + : HeaderSerialization.CreateNoParams; return Options.OperationMode == OperationMode.Index - ? IndexOp.Index - : IndexOp.Create; + ? HeaderSerialization.Index + : HeaderSerialization.Create; } /// diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs new file mode 100644 index 0000000..23a46f6 --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs @@ -0,0 +1,25 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Collections.Generic; +using Elastic.Ingest.Elasticsearch.DataStreams; + +namespace Elastic.Ingest.Elasticsearch.Serialization; + +/// TODO +public struct BulkHeader +{ + + /// The index to write to, never set when writing using + public string? Index { get; set; } + + /// The id of the object being written, never set when writing using + public string? Id { get; set; } + + /// Require to point to an alias, never set when writing using + public bool? RequireAlias { get; set; } + + /// TODO + public Dictionary? DynamicTemplates { get; init; } +} diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs deleted file mode 100644 index b5dd325..0000000 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information -using System; -using System.Collections.Generic; -using System.Text.Json; -using System.Text.Json.Serialization; - -namespace Elastic.Ingest.Elasticsearch.Serialization; - -/// Represents the _bulk operation meta header -public abstract class BulkOperationHeader -{ - /// The index or data stream to write to - [JsonPropertyName("_index")] - public string? Index { get; init; } - - /// The id of the object being written - [JsonPropertyName("_id")] - public string? Id { get; init; } - - /// Require to point to an alias - [JsonPropertyName("require_alias")] - public bool? RequireAlias { get; init; } -} - -/// Represents the _bulk create operation meta header -[JsonConverter(typeof(BulkOperationHeaderConverter))] -public class CreateOperation : BulkOperationHeader -{ - /// - [JsonPropertyName("dynamic_templates")] - public Dictionary? DynamicTemplates { get; init; } -} - -/// Represents the _bulk index operation meta header -[JsonConverter(typeof(BulkOperationHeaderConverter))] -public class IndexOperation : BulkOperationHeader -{ - /// - [JsonPropertyName("dynamic_templates")] - public Dictionary? DynamicTemplates { get; init; } -} - -/// Represents the _bulk delete operation meta header -[JsonConverter(typeof(BulkOperationHeaderConverter))] -public class DeleteOperation : BulkOperationHeader -{ -} - -/// Represents the _bulk update operation meta header -[JsonConverter(typeof(BulkOperationHeaderConverter))] -public class UpdateOperation : BulkOperationHeader -{ -} - -internal class BulkOperationHeaderConverter : JsonConverter - where THeader : BulkOperationHeader -{ - public override THeader Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => - throw new NotImplementedException(); - - public override void Write(Utf8JsonWriter writer, THeader value, JsonSerializerOptions options) - { - var op = value switch - { - CreateOperation _ => "create", - DeleteOperation _ => "delete", - IndexOperation _ => "index", - UpdateOperation _ => "update", - _ => throw new ArgumentOutOfRangeException(nameof(value), value, null) - }; - writer.WriteStartObject(); - writer.WritePropertyName(op); - writer.WriteStartObject(); - if (!string.IsNullOrWhiteSpace(value.Index)) - writer.WriteString("_index", value.Index); - if (!string.IsNullOrWhiteSpace(value.Id)) - writer.WriteString("_id", value.Id); - if (value.RequireAlias == true) - writer.WriteBoolean("require_alias", true); - if (value is CreateOperation c) - WriteDynamicTemplates(writer, options, c.DynamicTemplates); - if (value is IndexOperation i) - WriteDynamicTemplates(writer, options, i.DynamicTemplates); - - writer.WriteEndObject(); - writer.WriteEndObject(); - } - - private static void WriteDynamicTemplates(Utf8JsonWriter writer, JsonSerializerOptions options, Dictionary? templates) - { - if (templates is not { Count: > 0 }) return; - - writer.WritePropertyName("dynamic_templates"); - JsonSerializer.Serialize(writer, templates, options); - } -} diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs new file mode 100644 index 0000000..3ee7407 --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs @@ -0,0 +1,22 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Ingest.Elasticsearch.Serialization; + +/// TODO +public enum HeaderSerialization +{ + /// + Index, + /// + IndexNoParams, + /// + Create, + /// + CreateNoParams, + /// + Delete, + /// + Update, +} From 9c94df3d0f8eb0781018d00f9423d14940ada8b1 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 8 Oct 2024 15:39:43 +0200 Subject: [PATCH 05/10] Remove last reflection based code path --- src/Elastic.Channels/BufferedChannelBase.cs | 14 ++++----- .../Diagnostics/NoopBufferedChannel.cs | 4 +-- .../ResponseItemsBufferedChannelBase.cs | 8 ++--- src/Elastic.Ingest.Apm/ApmChannel.cs | 2 +- .../DataStreams/DataStreamChannel.cs | 3 +- .../ElasticsearchChannelBase.Serialization.cs | 2 -- .../ElasticsearchChannelBase.cs | 30 ++++++------------- .../ElasticsearchChannelOptionsBase.cs | 6 ++++ .../Indices/IndexChannel.cs | 3 +- .../Serialization/BulkHeader.cs | 1 - .../CustomOtlpTraceExporter.cs | 2 +- .../TransportChannelBase.cs | 8 ++--- .../DataStreamChannelTests.cs | 11 ++++++- .../IndexChannelTests.cs | 10 ++++++- .../SubPathTests.cs | 10 ++++++- 15 files changed, 66 insertions(+), 48 deletions(-) diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 1deaf41..e340018 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -122,26 +122,26 @@ public abstract class BufferedChannelBase internal InboundBuffer InboundBuffer { get; } /// - protected BufferedChannelBase(TChannelOptions options) : this(options, null) { } + protected BufferedChannelBase(TChannelOptions options, string diagnosticsName) : this(options, null, diagnosticsName) { } /// - protected BufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) + protected BufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) { TokenSource = options.CancellationToken.HasValue ? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value) : new CancellationTokenSource(); Options = options; - var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray(); + var listeners = callbackListeners == null ? [Options] : callbackListeners.Concat([Options]).ToArray(); DiagnosticsListener = listeners - .Select(l => (l is IChannelDiagnosticsListener c) ? c : null) - .FirstOrDefault(e => e != null); + .OfType() + .FirstOrDefault(); if (DiagnosticsListener == null && !options.DisableDiagnostics) { // if no debug listener was already provided but was requested explicitly create one. - var l = new ChannelDiagnosticsListener(GetType().Name); + var l = new ChannelDiagnosticsListener(diagnosticsName); DiagnosticsListener = l; - listeners = listeners.Concat(new[] { l }).ToArray(); + listeners = listeners.Concat([l]).ToArray(); } _callbacks = new ChannelCallbackInvoker(listeners); diff --git a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs index 4c461bf..c2adaeb 100644 --- a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs @@ -38,14 +38,14 @@ public class NoopChannelOptions : ChannelOptionsBase public NoopBufferedChannel( NoopChannelOptions options, ICollection>? channelListeners = null - ) : base(options, channelListeners) { } + ) : base(options, channelListeners, nameof(NoopBufferedChannel)) { } /// public NoopBufferedChannel( BufferOptions options, ICollection>? channelListeners = null, bool observeConcurrency = false - ) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners) + ) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners, nameof(NoopBufferedChannel)) { } diff --git a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs index e480d10..54e76ea 100644 --- a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs +++ b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs @@ -32,12 +32,12 @@ public abstract class ResponseItemsBufferedChannelBase - protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected ResponseItemsBufferedChannelBase(TChannelOptions options) - : base(options) { } + protected ResponseItemsBufferedChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// Based on should return a bool indicating if retry is needed protected abstract bool Retry(TResponse response); diff --git a/src/Elastic.Ingest.Apm/ApmChannel.cs b/src/Elastic.Ingest.Apm/ApmChannel.cs index caf6077..0ee8477 100644 --- a/src/Elastic.Ingest.Apm/ApmChannel.cs +++ b/src/Elastic.Ingest.Apm/ApmChannel.cs @@ -40,7 +40,7 @@ internal static class ApmChannelStatics public class ApmChannel : TransportChannelBase { /// - public ApmChannel(ApmChannelOptions options) : base(options) { } + public ApmChannel(ApmChannelOptions options) : base(options, nameof(ApmChannel)) { } //retry if APM server returns 429 /// diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index ffa6315..44f0af7 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -19,7 +19,8 @@ public class DataStreamChannel : ElasticsearchChannelBase options) : this(options, null) { } /// - public DataStreamChannel(DataStreamChannelOptions options, ICollection>? callbackListeners) : base(options, callbackListeners) + public DataStreamChannel(DataStreamChannelOptions options, ICollection>? callbackListeners, string? diagnosticsName = null) + : base(options, callbackListeners, diagnosticsName ?? nameof(DataStreamChannel)) { var dataStream = Options.DataStream.ToString(); diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs index 953a360..8120783 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -10,7 +10,6 @@ using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Ingest.Elasticsearch.Serialization; -using Elastic.Ingest.Transport; namespace Elastic.Ingest.Elasticsearch; @@ -19,7 +18,6 @@ namespace Elastic.Ingest.Elasticsearch; /// Coordinates most of the sending to- and bootstrapping of Elasticsearch /// public abstract partial class ElasticsearchChannelBase - : TransportChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { private static ReadOnlySpan PlainIndexBytesSpan => """ diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs index 8c6d39e..8a7284c 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs @@ -30,12 +30,12 @@ public abstract partial class ElasticsearchChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { /// - protected ElasticsearchChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected ElasticsearchChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected ElasticsearchChannelBase(TChannelOptions options) - : base(options) { } + protected ElasticsearchChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// protected override bool Retry(BulkResponse response) @@ -70,34 +70,22 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) => protected override Task ExportAsync(ITransport transport, ArraySegment page, CancellationToken ctx = default) { ctx = ctx == default ? TokenSource.Token : ctx; -#if NETSTANDARD2_1 - // Option is obsolete to prevent external users to set it. -#pragma warning disable CS0618 -// if (Options.UseReadOnlyMemory) -// #pragma warning restore CS0618 -// { -// var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader); -// return transport.RequestAsync(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx); -// } -#endif -#pragma warning disable IDE0022 // Use expression body for method return transport.RequestAsync(HttpMethod.POST, BulkUrl, PostData.StreamHandler(page, (_, _) => { - /* NOT USED */ + /* Synchronous code path never called */ }, - async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, Options, ctx).ConfigureAwait(false); }) + async (b, stream, t) => await WriteBufferToStreamAsync(b, stream, Options, t).ConfigureAwait(false)) , RequestParams, ctx); -#pragma warning restore IDE0022 // Use expression body for method } /// - protected class HeadIndexTemplateResponse : ElasticsearchResponse { } + protected class HeadIndexTemplateResponse : ElasticsearchResponse; /// - protected class PutIndexTemplateResponse : ElasticsearchResponse { } + protected class PutIndexTemplateResponse : ElasticsearchResponse; /// - protected class PutComponentTemplateResponse : ElasticsearchResponse { } + protected class PutComponentTemplateResponse : ElasticsearchResponse; } diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs index 758deed..e77b02d 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs @@ -3,6 +3,8 @@ // See the LICENSE file in the project root for more information using System; +using System.Collections; +using System.Collections.Generic; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Ingest.Transport; using Elastic.Transport; @@ -22,4 +24,8 @@ protected ElasticsearchChannelOptionsBase(ITransport transport) : base(transport /// public IElasticsearchEventWriter? EventWriter { get; set; } + /// Optionally set dynamic templates for event + public Func>? DynamicTemplates { get; set; } + + } diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index 8c07ed1..9646965 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -22,7 +22,8 @@ public class IndexChannel : ElasticsearchChannelBase options) : this(options, null) { } /// - public IndexChannel(IndexChannelOptions options, ICollection>? callbackListeners) : base(options, callbackListeners) + public IndexChannel(IndexChannelOptions options, ICollection>? callbackListeners, string? diagnosticsName = null) + : base(options, callbackListeners, diagnosticsName ?? nameof(IndexChannel)) { _url = base.BulkUrl; diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs index 23a46f6..a5e429a 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs @@ -10,7 +10,6 @@ namespace Elastic.Ingest.Elasticsearch.Serialization; /// TODO public struct BulkHeader { - /// The index to write to, never set when writing using public string? Index { get; set; } diff --git a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs index 03d8538..7233794 100644 --- a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs +++ b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs @@ -64,7 +64,7 @@ public TraceChannel(TraceChannelOptions options) : this(options, null) { } /// public TraceChannel(TraceChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { + : base(options, callbackListeners, nameof(TraceChannel)) { var o = new OtlpExporterOptions { Endpoint = options.Endpoint, diff --git a/src/Elastic.Ingest.Transport/TransportChannelBase.cs b/src/Elastic.Ingest.Transport/TransportChannelBase.cs index 98515aa..2b33841 100644 --- a/src/Elastic.Ingest.Transport/TransportChannelBase.cs +++ b/src/Elastic.Ingest.Transport/TransportChannelBase.cs @@ -23,12 +23,12 @@ public abstract class TransportChannelBase - protected TransportChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected TransportChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected TransportChannelBase(TChannelOptions options) - : base(options) { } + protected TransportChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// Implement sending the current of the buffer to the output. /// diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs index f1dc0e5..3f7987e 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System; using System.IO; using System.Threading; using Elastic.Ingest.Elasticsearch.DataStreams; @@ -20,6 +21,7 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() var wait = new ManualResetEvent(false); + Exception exception = null; using var channel = new DataStreamChannel(new DataStreamChannelOptions(Transport) { BufferOptions = new() @@ -27,6 +29,11 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() OutboundBufferMaxSize = 1 }, DataStream = new("type"), + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -35,7 +42,9 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() }); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Uri.AbsolutePath.Should().Be("/type-generic-default/_bulk"); diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs index c44a8c7..7e1cbae 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs @@ -28,12 +28,18 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader var wait = new ManualResetEvent(false); + Exception exception = null; var options = new IndexChannelOptions(Transport) { BufferOptions = new() { OutboundBufferMaxSize = 1 }, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -50,7 +56,9 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader using var channel = new IndexChannel(options); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Should().NotBeNull(); callDetails.Uri.AbsolutePath.Should().Be(expectedUrl); diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs index cef3ae6..bb34c52 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs @@ -29,12 +29,18 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader ApiCallDetails callDetails = null; var wait = new ManualResetEvent(false); + Exception exception = null; var options = new IndexChannelOptions(Transport) { BufferOptions = new() { OutboundBufferMaxSize = 1 }, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -51,7 +57,9 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader using var channel = new IndexChannel(options); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Should().NotBeNull(); callDetails.Uri.AbsolutePath.Should().Be(expectedUrl); From 33e586775facab5cd41c9c2b51a288a73639353a Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 9 Oct 2024 12:53:56 +0200 Subject: [PATCH 06/10] Ensure bulk header options can be set --- .../DataStreams/DataStreamChannel.cs | 19 ++++++-- .../ElasticsearchChannelBase.Bytes.cs | 27 ++++------- .../ElasticsearchChannelBase.Serialization.cs | 2 +- .../ElasticsearchChannelOptionsBase.cs | 4 +- .../Indices/IndexChannel.cs | 47 ++++++++++--------- .../Indices/IndexChannelOptions.cs | 3 ++ .../Serialization/BulkHeader.cs | 19 +++++--- .../Serialization/HeaderSerialization.cs | 2 +- .../DataStreamIngestionTests.cs | 2 +- 9 files changed, 72 insertions(+), 53 deletions(-) diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index 44f0af7..eeafe7f 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -27,11 +27,22 @@ public DataStreamChannel(DataStreamChannelOptions options, ICollection - protected override HeaderSerialization GetIndexOp(TEvent @event) => HeaderSerialization.CreateNoParams; + /// + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event) + { + + var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event); + var templates = Options.DynamicTemplateLookup?.Invoke(@event); + if (templates is null && listExecutedPipelines is null) + return (HeaderSerializationStrategy.CreateNoParams, null); - /// - protected override void MutateHeader(TEvent @event, ref BulkHeader header) { } + var header = new BulkHeader + { + DynamicTemplates = templates, + ListExecutedPipelines = listExecutedPipelines + }; + return (HeaderSerializationStrategy.Create, header); + } /// protected override string TemplateName => Options.DataStream.GetTemplateName(); diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs index e7c386f..d1919eb 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -25,12 +25,7 @@ public abstract partial class ElasticsearchChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { /// TODO - protected abstract HeaderSerialization GetIndexOp(TEvent @event); - - /// - /// - /// - protected abstract void MutateHeader(TEvent @event, ref BulkHeader header); + protected abstract (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event); /// /// Asynchronously write the NDJSON request body for a page of events to . @@ -57,28 +52,26 @@ public async Task WriteBufferToStreamAsync( var @event = items[i]; if (@event == null) continue; - var op = GetIndexOp(@event); + var (op, header) = EventIndexStrategy(@event); switch (op) { - case HeaderSerialization.IndexNoParams: + case HeaderSerializationStrategy.IndexNoParams: await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); break; - case HeaderSerialization.CreateNoParams: + case HeaderSerializationStrategy.CreateNoParams: await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false); break; - case HeaderSerialization.Index: - case HeaderSerialization.Create: - case HeaderSerialization.Delete: - case HeaderSerialization.Update: - var header = new BulkHeader(); - MutateHeader(@event, ref header); + case HeaderSerializationStrategy.Index: + case HeaderSerializationStrategy.Create: + case HeaderSerializationStrategy.Delete: + case HeaderSerializationStrategy.Update: await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false); break; } await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); - if (op == HeaderSerialization.Update) + if (op == HeaderSerializationStrategy.Update) await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false); if (options.EventWriter?.WriteToStreamAsync != null) @@ -87,7 +80,7 @@ public async Task WriteBufferToStreamAsync( await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx) .ConfigureAwait(false); - if (op == HeaderSerialization.Update) + if (op == HeaderSerializationStrategy.Update) await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false); await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs index 8120783..5d7cbc9 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -37,7 +37,7 @@ public abstract partial class ElasticsearchChannelBase private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray(); #endif - private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader header, JsonSerializerOptions serializerOptions, CancellationToken ctx) => + private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx) => throw new NotImplementedException(); diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs index e77b02d..3d3b52b 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs @@ -25,7 +25,9 @@ protected ElasticsearchChannelOptionsBase(ITransport transport) : base(transport public IElasticsearchEventWriter? EventWriter { get; set; } /// Optionally set dynamic templates for event - public Func>? DynamicTemplates { get; set; } + public Func?>? DynamicTemplateLookup { get; set; } + /// If true, the response will include the ingest pipelines that were executed. Defaults to false. + public Func? ListExecutedPipelines { get; set; } } diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index 9646965..49b6ca6 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -7,6 +7,7 @@ using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Ingest.Transport; +using static Elastic.Ingest.Elasticsearch.Serialization.HeaderSerializationStrategy; namespace Elastic.Ingest.Elasticsearch.Indices; @@ -42,36 +43,38 @@ public IndexChannel(IndexChannelOptions options, ICollection protected override string BulkUrl => _url; - /// - protected override HeaderSerialization GetIndexOp(TEvent @event) + /// + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event) { var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value); var index = _skipIndexName ? string.Empty : string.Format(Options.IndexFormat, indexTime); - var id = Options.BulkOperationIdLookup?.Invoke(@event); - if (string.IsNullOrWhiteSpace(index) && string.IsNullOrWhiteSpace(id)) + var templates = Options.DynamicTemplateLookup?.Invoke(@event); + var requireAlias = Options.RequireAlias?.Invoke(@event); + var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event); + if (string.IsNullOrWhiteSpace(index) + && string.IsNullOrWhiteSpace(id) + && templates is null + && requireAlias is null + && listExecutedPipelines is null) return Options.OperationMode == OperationMode.Index - ? HeaderSerialization.IndexNoParams - : HeaderSerialization.CreateNoParams; - - return Options.OperationMode == OperationMode.Index - ? HeaderSerialization.Index - : HeaderSerialization.Create; - } + ? (IndexNoParams, null) + : (CreateNoParams, null); - /// - protected override void MutateHeader(TEvent @event, ref BulkHeader header) - { - var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; - if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value); - - var index = _skipIndexName ? string.Empty : string.Format(Options.IndexFormat, indexTime); - header.Index = index; - - var id = Options.BulkOperationIdLookup?.Invoke(@event); - header.Id = id; + var header = new BulkHeader + { + Id = id, + Index = index, + DynamicTemplates = templates, + RequireAlias = requireAlias, + ListExecutedPipelines = listExecutedPipelines + }; + + return (Options.OperationMode == OperationMode.Index + ? HeaderSerializationStrategy.Index + : Create, header); } /// diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs index ddc9ff1..c0758e2 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs @@ -43,6 +43,9 @@ public IndexChannelOptions(ITransport transport) : base(transport) { } /// public Func? BulkOperationIdLookup { get; set; } + /// If true, the action must target an index alias. Defaults to false. + public Func? RequireAlias { get; set; } + /// /// Uses the callback provided to to determine if this is in fact an update operation /// If this returns true the document will be sent as an upsert operation diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs index a5e429a..6a3ee48 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs @@ -8,17 +8,24 @@ namespace Elastic.Ingest.Elasticsearch.Serialization; /// TODO -public struct BulkHeader +public readonly struct BulkHeader { /// The index to write to, never set when writing using - public string? Index { get; set; } + public string? Index { get; init; } /// The id of the object being written, never set when writing using - public string? Id { get; set; } + public string? Id { get; init; } /// Require to point to an alias, never set when writing using - public bool? RequireAlias { get; set; } + public bool? RequireAlias { get; init; } - /// TODO - public Dictionary? DynamicTemplates { get; init; } + /// + /// A map from the full name of fields to the name of dynamic templates. Defaults to an empty map. If a name matches a dynamic template, + /// then that template will be applied regardless of other match predicates defined in the template. And if a field is already defined + /// in the mapping, then this parameter won’t be used. + /// + public IDictionary? DynamicTemplates { get; init; } + + /// If true, the response will include the ingest pipelines that were executed. Defaults to false. + public bool? ListExecutedPipelines { get; init; } } diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs index 3ee7407..f90a1d7 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs @@ -5,7 +5,7 @@ namespace Elastic.Ingest.Elasticsearch.Serialization; /// TODO -public enum HeaderSerialization +public enum HeaderSerializationStrategy { /// Index, diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs index a1b16b8..2e239ef 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs @@ -29,7 +29,7 @@ public async Task EnsureDocumentsEndUpInDataStream() var options = new DataStreamChannelOptions(Client.Transport) { DataStream = targetDataStream, - BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 } + BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 }, }; var channel = new DataStreamChannel(options); From 39de9dc1219e7ef886b3537b003da3eb49e1b56f Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 9 Oct 2024 13:39:56 +0200 Subject: [PATCH 07/10] add tests to ensure the right strategies can be provided for indexing events --- .../DataStreams/DataStreamChannel.cs | 3 +- .../ElasticsearchChannelBase.Serialization.cs | 5 +- .../Indices/IndexChannel.cs | 15 ++- .../IndexChannelEventOptionsTests.cs | 104 ++++++++++++++++++ 4 files changed, 118 insertions(+), 9 deletions(-) create mode 100644 tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index eeafe7f..1bebe12 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -30,10 +30,9 @@ public DataStreamChannel(DataStreamChannelOptions options, ICollection protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event) { - var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event); var templates = Options.DynamicTemplateLookup?.Invoke(@event); - if (templates is null && listExecutedPipelines is null) + if (templates is null && listExecutedPipelines is null or false) return (HeaderSerializationStrategy.CreateNoParams, null); var header = new BulkHeader diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs index 5d7cbc9..d1a3ee5 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -37,8 +37,9 @@ public abstract partial class ElasticsearchChannelBase private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray(); #endif - private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx) => - throw new NotImplementedException(); + private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx + ) => + Task.CompletedTask; #if NET8_0_OR_GREATER diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index 49b6ca6..1f4464d 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -54,11 +54,13 @@ protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy var templates = Options.DynamicTemplateLookup?.Invoke(@event); var requireAlias = Options.RequireAlias?.Invoke(@event); var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event); + var isUpsert = Options.BulkUpsertLookup?.Invoke(@event, index) is true; if (string.IsNullOrWhiteSpace(index) && string.IsNullOrWhiteSpace(id) && templates is null - && requireAlias is null - && listExecutedPipelines is null) + && isUpsert is false + && requireAlias is null or false + && listExecutedPipelines is null or false) return Options.OperationMode == OperationMode.Index ? (IndexNoParams, null) : (CreateNoParams, null); @@ -71,10 +73,13 @@ protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy RequireAlias = requireAlias, ListExecutedPipelines = listExecutedPipelines }; - - return (Options.OperationMode == OperationMode.Index + var op = Options.OperationMode == OperationMode.Index ? HeaderSerializationStrategy.Index - : Create, header); + : Create; + if (isUpsert) + op = Update; + + return (op, header); } /// diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs new file mode 100644 index 0000000..296cf38 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs @@ -0,0 +1,104 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.IO; +using System.Threading; +using Elastic.Channels.Diagnostics; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Transport; +using FluentAssertions; +using Xunit; + +namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; + +public class TestDocument +{ + private static int Counter = 0; + private readonly int _id = ++Counter; + + public DateTimeOffset Timestamp { get; set; } + public int Id => _id; +} + +internal class TestIndexChannel(IndexChannelOptions options) : IndexChannel(options) +{ + public List<(HeaderSerializationStrategy, BulkHeader?)> Strategies { get; } = new(); + + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TestDocument @event) + { + var strategy = base.EventIndexStrategy(@event); + Strategies.Add(strategy); + return strategy; + } +} + +public class IndexChannelEventOptionsTests : ChannelTestWithSingleDocResponseBase +{ + [Fact] + public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() + { + ApiCallDetails callDetails = null; + + var wait = new ManualResetEvent(false); + + Exception exception = null; + using var channel = new TestIndexChannel(new IndexChannelOptions(Transport) + { + IndexFormat = "test-index", + BufferOptions = new() { OutboundBufferMaxSize = 1 }, + DynamicTemplateLookup = document => document.Id == 2 ? new Dictionary { { "id", "1" } } : null, + BulkOperationIdLookup = document => document.Id == 3 ? "33" : null, + BulkUpsertLookup = (document, _) => document.Id == 4, + RequireAlias = document => document.Id == 5, + ListExecutedPipelines = document => document.Id == 6, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, + ExportResponseCallback = (response, _) => + { + callDetails = response.ApiCallDetails; + wait.Set(); + } + }); + + channel.TryWrite(new TestDocument()); //0 + channel.TryWrite(new TestDocument()); //1 + channel.TryWrite(new TestDocument()); //2 + channel.TryWrite(new TestDocument()); //3 + channel.TryWrite(new TestDocument()); //4 + channel.TryWrite(new TestDocument()); //5 + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); + + callDetails.Uri.AbsolutePath.Should().Be("/test-index/_bulk"); + + + channel.Strategies.Should().HaveCount(6); + + channel.Strategies[0].Item1.Should().Be(HeaderSerializationStrategy.CreateNoParams); + channel.Strategies[0].Item2.Should().BeNull(); + + channel.Strategies[1].Item1.Should().Be(HeaderSerializationStrategy.Create); + channel.Strategies[1].Item2.Should().NotBeNull(); + channel.Strategies[1].Item2!.Value.DynamicTemplates.Should().NotBeNull(); + + channel.Strategies[2].Item1.Should().Be(HeaderSerializationStrategy.Create); + channel.Strategies[2].Item2.Should().NotBeNull(); + channel.Strategies[2].Item2!.Value.Id.Should().Be("33"); + + channel.Strategies[3].Item1.Should().Be(HeaderSerializationStrategy.Update); + + channel.Strategies[4].Item2!.Value.RequireAlias.Should().BeTrue(); + + channel.Strategies[5].Item2!.Value.ListExecutedPipelines.Should().BeTrue(); + + } +} From 5ba834035716d8be921d54b0242875d192b8596f Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 9 Oct 2024 13:57:16 +0200 Subject: [PATCH 08/10] Add tests for datastream bulk indexing strategies too --- .../DataStreamChannelEventStrategyTests.cs | 90 +++++++++++++++ .../IndexChannelEventOptionsTests.cs | 104 ------------------ .../IndexChannelEventStrategyTests.cs | 99 +++++++++++++++++ .../Strategies/TrackDocument.cs | 31 ++++++ 4 files changed, 220 insertions(+), 104 deletions(-) create mode 100644 tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs delete mode 100644 tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs create mode 100644 tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventStrategyTests.cs create mode 100644 tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/TrackDocument.cs diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs new file mode 100644 index 0000000..706bce1 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs @@ -0,0 +1,90 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using Elastic.Channels.Diagnostics; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Transport; +using FluentAssertions; +using Xunit; + +namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; + +internal class TestDataStreamChannel(DataStreamChannelOptions options) + : DataStreamChannel(options) +{ + public List Strategies { get; } = new(); + + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(DataStreamDocument @event) + { + var strategy = base.EventIndexStrategy(@event); + Strategies.Add(new TrackStrategy + { + Id = @event.Id, + Strategy = strategy.Item1, + Header = strategy.Item2 + }); + return strategy; + } +} + +public class DataStreamChannelEventStrategyTests : ChannelTestWithSingleDocResponseBase +{ + [Fact] + public void EmitsExpectedStrategies() + { + ApiCallDetails callDetails = null; + + var wait = new ManualResetEvent(false); + + Exception exception = null; + using var channel = new TestDataStreamChannel(new DataStreamChannelOptions(Transport) + { + BufferOptions = new() { OutboundBufferMaxSize = 3 }, + DynamicTemplateLookup = document => document.Id == 2 ? new Dictionary { { "id", "1" } } : null, + ListExecutedPipelines = document => document.Id == 3, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, + ExportResponseCallback = (response, _) => + { + callDetails = response.ApiCallDetails; + wait.Set(); + } + }); + + channel.TryWrite(new DataStreamDocument()); //0 + channel.TryWrite(new DataStreamDocument()); //1 + channel.TryWrite(new DataStreamDocument()); //2 + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); + + callDetails.Uri.AbsolutePath.Should().Be("/datastreamdocument-generic-default/_bulk"); + + + channel.Strategies.Should().HaveCount(3); + var strategies = channel.Strategies.OrderBy(s => s.Id).ToList(); + + strategies[0].Strategy.Should().Be(HeaderSerializationStrategy.CreateNoParams); + strategies[0].Header.Should().BeNull(); + + strategies[1].Strategy.Should().Be(HeaderSerializationStrategy.Create); + strategies[1].Header.Should().NotBeNull(); + strategies[1].Header!.Value.DynamicTemplates.Should().NotBeNull(); + + strategies[2].Strategy.Should().Be(HeaderSerializationStrategy.Create); + strategies[2].Header.Should().NotBeNull(); + strategies[2].Header!.Value.ListExecutedPipelines.Should().BeTrue(); + + } +} diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs deleted file mode 100644 index 296cf38..0000000 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventOptionsTests.cs +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System; -using System.Collections.Generic; -using System.IO; -using System.Threading; -using Elastic.Channels.Diagnostics; -using Elastic.Ingest.Elasticsearch.DataStreams; -using Elastic.Ingest.Elasticsearch.Indices; -using Elastic.Ingest.Elasticsearch.Serialization; -using Elastic.Transport; -using FluentAssertions; -using Xunit; - -namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; - -public class TestDocument -{ - private static int Counter = 0; - private readonly int _id = ++Counter; - - public DateTimeOffset Timestamp { get; set; } - public int Id => _id; -} - -internal class TestIndexChannel(IndexChannelOptions options) : IndexChannel(options) -{ - public List<(HeaderSerializationStrategy, BulkHeader?)> Strategies { get; } = new(); - - protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TestDocument @event) - { - var strategy = base.EventIndexStrategy(@event); - Strategies.Add(strategy); - return strategy; - } -} - -public class IndexChannelEventOptionsTests : ChannelTestWithSingleDocResponseBase -{ - [Fact] - public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() - { - ApiCallDetails callDetails = null; - - var wait = new ManualResetEvent(false); - - Exception exception = null; - using var channel = new TestIndexChannel(new IndexChannelOptions(Transport) - { - IndexFormat = "test-index", - BufferOptions = new() { OutboundBufferMaxSize = 1 }, - DynamicTemplateLookup = document => document.Id == 2 ? new Dictionary { { "id", "1" } } : null, - BulkOperationIdLookup = document => document.Id == 3 ? "33" : null, - BulkUpsertLookup = (document, _) => document.Id == 4, - RequireAlias = document => document.Id == 5, - ListExecutedPipelines = document => document.Id == 6, - ExportExceptionCallback = e => - { - exception = e; - wait.Set(); - }, - ExportResponseCallback = (response, _) => - { - callDetails = response.ApiCallDetails; - wait.Set(); - } - }); - - channel.TryWrite(new TestDocument()); //0 - channel.TryWrite(new TestDocument()); //1 - channel.TryWrite(new TestDocument()); //2 - channel.TryWrite(new TestDocument()); //3 - channel.TryWrite(new TestDocument()); //4 - channel.TryWrite(new TestDocument()); //5 - var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); - signalled.Should().BeTrue("because ExportResponseCallback should have been called"); - exception.Should().BeNull(); - - callDetails.Uri.AbsolutePath.Should().Be("/test-index/_bulk"); - - - channel.Strategies.Should().HaveCount(6); - - channel.Strategies[0].Item1.Should().Be(HeaderSerializationStrategy.CreateNoParams); - channel.Strategies[0].Item2.Should().BeNull(); - - channel.Strategies[1].Item1.Should().Be(HeaderSerializationStrategy.Create); - channel.Strategies[1].Item2.Should().NotBeNull(); - channel.Strategies[1].Item2!.Value.DynamicTemplates.Should().NotBeNull(); - - channel.Strategies[2].Item1.Should().Be(HeaderSerializationStrategy.Create); - channel.Strategies[2].Item2.Should().NotBeNull(); - channel.Strategies[2].Item2!.Value.Id.Should().Be("33"); - - channel.Strategies[3].Item1.Should().Be(HeaderSerializationStrategy.Update); - - channel.Strategies[4].Item2!.Value.RequireAlias.Should().BeTrue(); - - channel.Strategies[5].Item2!.Value.ListExecutedPipelines.Should().BeTrue(); - - } -} diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventStrategyTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventStrategyTests.cs new file mode 100644 index 0000000..d2d69a0 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventStrategyTests.cs @@ -0,0 +1,99 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Transport; +using FluentAssertions; +using Xunit; + +namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; + +internal class TestIndexChannel(IndexChannelOptions options) : IndexChannel(options) +{ + public List Strategies { get; } = new(); + + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(IndexDocument @event) + { + var strategy = base.EventIndexStrategy(@event); + Strategies.Add(new TrackStrategy + { + Id = @event.Id, + Strategy = strategy.Item1, + Header = strategy.Item2 + }); + return strategy; + } +} + +public class IndexChannelEventStrategyTests : ChannelTestWithSingleDocResponseBase +{ + [Fact] + public void EmitsExpectedStrategies() + { + ApiCallDetails callDetails = null; + + var wait = new ManualResetEvent(false); + + Exception exception = null; + using var channel = new TestIndexChannel(new IndexChannelOptions(Transport) + { + IndexFormat = "test-index", + BufferOptions = new() { OutboundBufferMaxSize = 6 }, + DynamicTemplateLookup = document => document.Id == 2 ? new Dictionary { { "id", "1" } } : null, + BulkOperationIdLookup = document => document.Id == 3 ? "33" : null, + BulkUpsertLookup = (document, _) => document.Id == 4, + RequireAlias = document => document.Id == 5, + ListExecutedPipelines = document => document.Id == 6, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, + ExportResponseCallback = (response, _) => + { + callDetails = response.ApiCallDetails; + wait.Set(); + } + }); + + channel.TryWrite(new IndexDocument()); //0 + channel.TryWrite(new IndexDocument()); //1 + channel.TryWrite(new IndexDocument()); //2 + channel.TryWrite(new IndexDocument()); //3 + channel.TryWrite(new IndexDocument()); //4 + channel.TryWrite(new IndexDocument()); //5 + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); + + callDetails.Uri.AbsolutePath.Should().Be("/test-index/_bulk"); + + channel.Strategies.Should().HaveCount(6); + var strategies = channel.Strategies.OrderBy(s => s.Id).ToList(); + + strategies[0].Id.Should().Be(1); + strategies[0].Strategy.Should().Be(HeaderSerializationStrategy.CreateNoParams); + strategies[0].Header.Should().BeNull(); + + strategies[1].Strategy.Should().Be(HeaderSerializationStrategy.Create); + strategies[1].Header.Should().NotBeNull(); + strategies[1].Header!.Value.DynamicTemplates.Should().NotBeNull(); + + strategies[2].Strategy.Should().Be(HeaderSerializationStrategy.Create); + strategies[2].Header.Should().NotBeNull(); + strategies[2].Header!.Value.Id.Should().Be("33"); + + strategies[3].Strategy.Should().Be(HeaderSerializationStrategy.Update); + + strategies[4].Header!.Value.RequireAlias.Should().BeTrue(); + + strategies[5].Header!.Value.ListExecutedPipelines.Should().BeTrue(); + + } +} diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/TrackDocument.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/TrackDocument.cs new file mode 100644 index 0000000..aab0029 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/TrackDocument.cs @@ -0,0 +1,31 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using Elastic.Ingest.Elasticsearch.Serialization; + +namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; + +public class IndexDocument +{ + private static int Counter; + + public DateTimeOffset Timestamp { get; set; } + public int Id { get; } = ++Counter; +} +public class DataStreamDocument +{ + private static int Counter; + + public DateTimeOffset Timestamp { get; set; } + public int Id { get; } = ++Counter; +} + +public class TrackStrategy +{ + public int Id { get; init; } + public HeaderSerializationStrategy Strategy { get; init; } + public BulkHeader? Header { get; init; } + +} From 61cf2000af37533111cb5b99a45a86396bbc5259 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Wed, 9 Oct 2024 13:57:51 +0200 Subject: [PATCH 09/10] clean up namespaces --- .../Strategies/DataStreamChannelEventStrategyTests.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs index 706bce1..40356b1 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs @@ -4,12 +4,9 @@ using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Threading; -using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch.DataStreams; -using Elastic.Ingest.Elasticsearch.Indices; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Transport; using FluentAssertions; From 67bcd7b82de20a2d09a297e38b44a1335f1e0a45 Mon Sep 17 00:00:00 2001 From: Steve Gordon Date: Wed, 9 Oct 2024 14:16:39 +0200 Subject: [PATCH 10/10] Optimize serialization and update test coverage - DataStreamChannel.cs: Removed unnecessary BulkHeader creation. - Elastic.Ingest.Elasticsearch.csproj: Enabled unsafe blocks. - ElasticsearchChannelBase.Bytes.cs: Standardized IReadOnlyList handling and updated SerializeHeaderAsync. - ElasticsearchChannelBase.Serialization.cs: Added usings, optimized serialization for .NET 8.0, and added new methods. - BulkHeader.cs: Standardized comment punctuation. - Elastic.Ingest.Elasticsearch.Tests.csproj: Added net8.0 target framework. - IndexChannelTests.cs: Added new tests and enhanced ExecuteAndAssert method. --- .../DataStreams/DataStreamChannel.cs | 1 - .../Elastic.Ingest.Elasticsearch.csproj | 1 + .../ElasticsearchChannelBase.Bytes.cs | 6 +- .../ElasticsearchChannelBase.Serialization.cs | 308 +++++++++++++++++- .../Serialization/BulkHeader.cs | 2 +- .../Elastic.Ingest.Elasticsearch.Tests.csproj | 2 +- .../IndexChannelTests.cs | 30 +- 7 files changed, 332 insertions(+), 18 deletions(-) diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index 1bebe12..d09a389 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -34,7 +34,6 @@ protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy var templates = Options.DynamicTemplateLookup?.Invoke(@event); if (templates is null && listExecutedPipelines is null or false) return (HeaderSerializationStrategy.CreateNoParams, null); - var header = new BulkHeader { DynamicTemplates = templates, diff --git a/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj b/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj index 730ec27..82d293b 100644 --- a/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj +++ b/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj @@ -7,6 +7,7 @@ latest enable True + true diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs index d1919eb..4125f92 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -41,8 +41,8 @@ public async Task WriteBufferToStreamAsync( #if NETSTANDARD2_1_OR_GREATER var items = page; #else - // needs cast prior to netstandard2.0 - IReadOnlyList items = page; + // needs cast prior to netstandard2.0 + IReadOnlyList items = page; #endif // for is okay on ArraySegment, foreach performs bad: // https://antao-almada.medium.com/how-to-use-span-t-and-memory-t-c0b126aae652 @@ -65,7 +65,7 @@ public async Task WriteBufferToStreamAsync( case HeaderSerializationStrategy.Create: case HeaderSerializationStrategy.Delete: case HeaderSerializationStrategy.Update: - await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false); + await SerializeHeaderAsync(stream, op, ref header, ctx).ConfigureAwait(false); break; } diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs index d1a3ee5..8af99df 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -3,7 +3,12 @@ // See the LICENSE file in the project root for more information using System; +using System.Buffers; +using System.Collections.Generic; using System.IO; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices.ComTypes; +using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; @@ -20,27 +25,308 @@ namespace Elastic.Ingest.Elasticsearch; public abstract partial class ElasticsearchChannelBase where TChannelOptions : ElasticsearchChannelOptionsBase { - private static ReadOnlySpan PlainIndexBytesSpan => """ - {"index":{}} +#if NET8_0_OR_GREATER + private static ReadOnlySpan IndexPrefixBytesSpan => """{"index":{"""u8; + private static ReadOnlySpan CreatePrefixBytesSpan => """{"create":{"""u8; + private static ReadOnlySpan DeletePrefixBytesSpan => """{"delete":{"""u8; + private static ReadOnlySpan UpdatePrefixBytesSpan => """{"update":{"""u8; + private static ReadOnlySpan SuffixBytesSpan => """}}"""u8; - """u8; + private static ReadOnlySpan IdPropertyPrefixBytesSpan => "\"_id\":\""u8; + private static ReadOnlySpan IndexPropertyPrefixBytesSpan => "\"_index\":\""u8; + private static ReadOnlySpan ListExecutedPipelinesPropertyPrefixBytesSpan => "\"list_executed_pipelines\":"u8; + private static ReadOnlySpan RequireAliasPropertyPrefixBytesSpan => "\"require_alias\":"u8; + private static ReadOnlySpan DyanamicTemplatesPropertyPrefixBytesSpan => "\"dynamic_templates\":"u8; - private static ReadOnlySpan PlainCreateBytesSpan => """ - {"create":{}} + private static ReadOnlySpan TrueBytesSpan => "true"u8; + private static ReadOnlySpan FalseBytesSpan => "false"u8; - """u8; + private static ReadOnlySpan DoubleQuote => [(byte)'"']; + private static ReadOnlySpan Comma => [(byte)',']; -#if NETSTANDARD + private static ReadOnlySpan OpenSquare => [(byte)'[']; + private static ReadOnlySpan CloseSquare => [(byte)']']; + private static ReadOnlySpan OpenCurlyBrace => [(byte)'{']; + private static ReadOnlySpan CloseCurlyBrace => [(byte)'}']; + private static ReadOnlySpan Colon => [(byte)':']; +#endif - private static byte[] PlainIndexBytes => PlainIndexBytesSpan.ToArray(); + private static ReadOnlySpan PlainIndexBytesSpan => """{"index":{}}"""u8; + private static ReadOnlySpan PlainCreateBytesSpan => """{"create":{}}"""u8; +#if NETSTANDARD + private static byte[] PlainIndexBytes => PlainIndexBytesSpan.ToArray(); private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray(); + + private static readonly JsonEncodedText CreateOperation = JsonEncodedText.Encode("create"); + private static readonly JsonEncodedText UpdateOperation = JsonEncodedText.Encode("update"); + private static readonly JsonEncodedText IndexOperation = JsonEncodedText.Encode("index"); + private static readonly JsonEncodedText DeleteOperation = JsonEncodedText.Encode("delete"); + private static readonly JsonEncodedText IdProperty = JsonEncodedText.Encode("_id"); + private static readonly JsonEncodedText IndexProperty = JsonEncodedText.Encode("_index"); + private static readonly JsonEncodedText RequireAliasProperty = JsonEncodedText.Encode("require_alias"); + private static readonly JsonEncodedText ListExecutedPipelinesProperty = JsonEncodedText.Encode("list_executed_pipelines"); + private static readonly JsonEncodedText DynamicTemplatesProperty = JsonEncodedText.Encode("dynamic_templates"); #endif - private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader? header, JsonSerializerOptions serializerOptions, CancellationToken ctx - ) => - Task.CompletedTask; +#if NET8_0_OR_GREATER + [SkipLocalsInit] + private static ValueTask SerializeHeaderAsync(Stream stream, HeaderSerializationStrategy operation, ref readonly BulkHeader? header, CancellationToken ctx) + { + if (!header.HasValue) + { + switch (operation) + { + case HeaderSerializationStrategy.Index: + stream.Write(PlainIndexBytesSpan); + break; + case HeaderSerializationStrategy.Create: + stream.Write(PlainCreateBytesSpan); + break; + default: + throw new ArgumentException($"Expected non null value for {operation}.", nameof(header)); + } + + return ValueTask.CompletedTask; + } + + Span buffer = stackalloc byte[256]; + + switch (operation) + { + case HeaderSerializationStrategy.Index: + stream.Write(IndexPrefixBytesSpan); + break; + case HeaderSerializationStrategy.Create: + stream.Write(CreatePrefixBytesSpan); + break; + case HeaderSerializationStrategy.Delete: + stream.Write(DeletePrefixBytesSpan); + break; + case HeaderSerializationStrategy.Update: + stream.Write(UpdatePrefixBytesSpan); + break; + default: + throw new ArgumentException($"Unexpected operation {operation}."); + } + + var propertyCount = 0; + var headerValue = header.Value; + + if (!string.IsNullOrEmpty(headerValue.Index)) + WriteString(IndexPropertyPrefixBytesSpan, stream, headerValue.Index, buffer, ref propertyCount); + + if (!string.IsNullOrEmpty(headerValue.Id)) + WriteString(IdPropertyPrefixBytesSpan, stream, headerValue.Id, buffer, ref propertyCount); + + if (headerValue.RequireAlias.HasValue && headerValue.RequireAlias.Value) + { + WriteTrue(RequireAliasPropertyPrefixBytesSpan, stream, ref propertyCount); + } + else if (headerValue.RequireAlias.HasValue && !headerValue.RequireAlias.Value) + { + WriteFalse(RequireAliasPropertyPrefixBytesSpan, stream, ref propertyCount); + } + + if (headerValue.ListExecutedPipelines.HasValue && headerValue.ListExecutedPipelines.Value) + { + WriteTrue(ListExecutedPipelinesPropertyPrefixBytesSpan, stream, ref propertyCount); + } + else if (headerValue.ListExecutedPipelines.HasValue && !headerValue.ListExecutedPipelines.Value) + { + WriteFalse(ListExecutedPipelinesPropertyPrefixBytesSpan, stream, ref propertyCount); + } + + if (headerValue.DynamicTemplates is not null && headerValue.DynamicTemplates.Count > 0) + { + if (propertyCount > 0) + stream.Write(Comma); + + stream.Write(DyanamicTemplatesPropertyPrefixBytesSpan); + stream.Write(OpenSquare); + + var entryCount = 0; + foreach (var (key, value) in headerValue.DynamicTemplates) + { + WriteDictionaryEntry(stream, key, value, buffer, ref entryCount); + } + + stream.Write(CloseSquare); + propertyCount++; + } + + static void WriteString(ReadOnlySpan propertyNamePrefix, Stream stream, string value, Span buffer, ref int propertyCount) + { + if (propertyCount > 0) + stream.Write(Comma); + + stream.Write(propertyNamePrefix); // This includes the open quotes for the value + + var length = Encoding.UTF8.GetByteCount(value); + + if (length <= 256) + { + Encoding.UTF8.TryGetBytes(value, buffer, out var written); + stream.Write(buffer[..written]); + } + else + { + var rentedArray = ArrayPool.Shared.Rent(length); + Encoding.UTF8.TryGetBytes(value, rentedArray.AsSpan(), out var written); + stream.Write(buffer[..written]); + ArrayPool.Shared.Return(rentedArray); + } + + stream.Write(DoubleQuote); + propertyCount++; + } + + static void WriteTrue(ReadOnlySpan propertyNamePrefix, Stream stream, ref int propertyCount) + { + if (propertyCount > 0) + stream.Write(Comma); + + stream.Write(propertyNamePrefix); + stream.Write(TrueBytesSpan); + + propertyCount++; + } + + static void WriteFalse(ReadOnlySpan propertyNamePrefix, Stream stream, ref int propertyCount) + { + if (propertyCount > 0) + stream.Write(Comma); + + stream.Write(propertyNamePrefix); + stream.Write(FalseBytesSpan); + + propertyCount++; + } + static void WriteDictionaryEntry(Stream stream, string key, string value, Span buffer, ref int entryCount) + { + if (string.IsNullOrEmpty(key) || string.IsNullOrEmpty(value) || key.Length == 0 || value.Length == 0) + return; + + if (entryCount > 0) + stream.Write(Comma); + + stream.Write(OpenCurlyBrace); + WriteQuotedStringBytes(stream, key, buffer); + stream.Write(Colon); + WriteQuotedStringBytes(stream, value, buffer); + stream.Write(CloseCurlyBrace); + + entryCount++; + } + + static void WriteQuotedStringBytes(Stream stream, string value, Span buffer) + { + stream.Write(DoubleQuote); + var length = Encoding.UTF8.GetByteCount(value); + if (length <= 256) + { + Encoding.UTF8.TryGetBytes(value, buffer, out var written); + stream.Write(buffer[..written]); + } + else + { + var rentedArray = ArrayPool.Shared.Rent(length); + Encoding.UTF8.TryGetBytes(value, rentedArray.AsSpan(), out var written); + stream.Write(buffer[..written]); + ArrayPool.Shared.Return(rentedArray); + } + stream.Write(DoubleQuote); + } + + stream.Write(SuffixBytesSpan); + + return ValueTask.CompletedTask; + } +#else + private static Task SerializeHeaderAsync(Stream stream, HeaderSerializationStrategy operation, ref readonly BulkHeader? header, CancellationToken ctx) + { + if (!header.HasValue) + { + if (operation != HeaderSerializationStrategy.Index || operation != HeaderSerializationStrategy.Create) + throw new ArgumentException($"Expected non null value for {operation}.", nameof(header)); + + return HandleNullBulkHeaderAsync(stream, operation); + } + + var operationString = operation switch + { + HeaderSerializationStrategy.Create => CreateOperation, + HeaderSerializationStrategy.Delete => DeleteOperation, + HeaderSerializationStrategy.Index => IndexOperation, + HeaderSerializationStrategy.Update => UpdateOperation, + HeaderSerializationStrategy.IndexNoParams => throw new InvalidOperationException(), + HeaderSerializationStrategy.CreateNoParams => throw new InvalidOperationException(), + _ => throw new ArgumentOutOfRangeException(nameof(operation), operation, null) + }; + + var headerValue = header.Value; + + return SerializeHeaderAsync(stream, operationString, headerValue.Id, headerValue.Index, headerValue.RequireAlias, + headerValue.ListExecutedPipelines, headerValue.DynamicTemplates, ctx); + } + + private static async Task HandleNullBulkHeaderAsync(Stream stream, HeaderSerializationStrategy operation) + { + switch (operation) + { + case HeaderSerializationStrategy.Index: + await stream.WriteAsync(PlainIndexBytes, 0, PlainCreateBytes.Length).ConfigureAwait(false); + break; + case HeaderSerializationStrategy.Create: + await stream.WriteAsync(PlainCreateBytes, 0, PlainCreateBytes.Length).ConfigureAwait(false); + break; + } + } + + private static async Task SerializeHeaderAsync(Stream stream, JsonEncodedText operation, string? id, string? index, bool? requireAlias, + bool? listExecutedPipelines, IDictionary? dynamicTemplates, CancellationToken ctx) + { + var writer = new Utf8JsonWriter(stream, default); + + await using (writer.ConfigureAwait(false)) + { + writer.WriteStartObject(); + writer.WritePropertyName(operation); + writer.WriteStartObject(); + + if (!string.IsNullOrWhiteSpace(index)) + writer.WriteString(IndexProperty, index); + + if (!string.IsNullOrWhiteSpace(id)) + writer.WriteString(IdProperty, id); + + if (requireAlias.HasValue) + writer.WriteBoolean(RequireAliasProperty, requireAlias.Value); + + if (listExecutedPipelines.HasValue) + writer.WriteBoolean(ListExecutedPipelinesProperty, listExecutedPipelines.Value); + + if (dynamicTemplates is not null) + { + writer.WritePropertyName(DynamicTemplatesProperty); + writer.WriteStartArray(); + + foreach (var template in dynamicTemplates) + { + writer.WriteStartObject(); + writer.WriteString(template.Key, template.Value); + writer.WriteEndObject(); + } + + writer.WriteEndArray(); + } + + writer.WriteEndObject(); + writer.WriteEndObject(); + } + } +#endif #if NET8_0_OR_GREATER private static ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx = default) diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs index 6a3ee48..7cada19 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs @@ -22,7 +22,7 @@ public readonly struct BulkHeader /// /// A map from the full name of fields to the name of dynamic templates. Defaults to an empty map. If a name matches a dynamic template, /// then that template will be applied regardless of other match predicates defined in the template. And if a field is already defined - /// in the mapping, then this parameter won’t be used. + /// in the mapping, then this parameter won't be used. /// public IDictionary? DynamicTemplates { get; init; } diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Elastic.Ingest.Elasticsearch.Tests.csproj b/tests/Elastic.Ingest.Elasticsearch.Tests/Elastic.Ingest.Elasticsearch.Tests.csproj index 230063d..4afc722 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/Elastic.Ingest.Elasticsearch.Tests.csproj +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Elastic.Ingest.Elasticsearch.Tests.csproj @@ -1,7 +1,7 @@ - net6.0 + net6.0;net8.0 diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs index 7e1cbae..68aae42 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs @@ -3,6 +3,8 @@ // See the LICENSE file in the project root for more information using System; +using System.Collections; +using System.Collections.Generic; using System.IO; using System.Threading; using Elastic.Ingest.Elasticsearch.Indices; @@ -22,7 +24,18 @@ public void IndexChannel_WithFixedIndexName_UsesCorrectUrlAndOperationHeader() = public void IndexChannel_WithDynamicIndexName_UsesCorrectUrlAndOperationHeader() => ExecuteAndAssert("/_bulk", "{\"create\":{\"_index\":\"dotnet-2023.07.29\"}}"); - private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, string indexName = null) + [Fact] + public void IndexChannel_UsesOperationAndId() => + ExecuteAndAssert("/_bulk", "{\"index\":{\"_index\":\"dotnet-2023.07.29\",\"_id\":\"mydocid\"}}", id: "mydocid", operationMode: OperationMode.Index); + + [Fact] + public void IndexChannel_WritesCorrectHeaderWithAllOptions() => + ExecuteAndAssert("/fixed-index/_bulk", "{\"create\":{\"_id\":\"mydocid\",\"require_alias\":true,\"list_executed_pipelines\":true,\"dynamic_templates\":[{\"key1\":\"value1\"}]}}", "fixed-index", id: "mydocid", operationMode: OperationMode.Create, + requiresAlias: true, listExecutedPipelines: true, dynamicTemplates: new Dictionary { { "key1", "value1"} }); + + private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, + string indexName = null, string id = null, OperationMode? operationMode = null, bool? requiresAlias = null, + bool? listExecutedPipelines = null, IDictionary dynamicTemplates = null) { ApiCallDetails callDetails = null; @@ -48,6 +61,21 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader TimestampLookup = _ => new DateTimeOffset(2023, 07, 29, 20, 00, 00, TimeSpan.Zero), }; + if (operationMode.HasValue) + options.OperationMode = operationMode.Value; + + if (!string.IsNullOrEmpty(id)) + options.BulkOperationIdLookup = _ => id; + + if (requiresAlias.HasValue) + options.RequireAlias = _ => requiresAlias.Value; + + if (dynamicTemplates is not null) + options.DynamicTemplateLookup = _ => dynamicTemplates; + + if (listExecutedPipelines.HasValue) + options.ListExecutedPipelines = _ => listExecutedPipelines.Value; + if (indexName is not null) { options.IndexFormat = indexName;