diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs index 52fa94c..d8f71ed 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkIngestionBenchmarks.cs @@ -26,9 +26,6 @@ public class BulkIngestionBenchmarks [ParamsAllValues] public bool DisableDiagnostics { get; set; } - [ParamsAllValues] - public bool UseReadOnlyMemory { get; set; } - [GlobalSetup] public void Setup() { @@ -47,7 +44,6 @@ public void Setup() OutboundBufferMaxSize = MaxExportSize }, DisableDiagnostics = DisableDiagnostics, - UseReadOnlyMemory = UseReadOnlyMemory, IndexFormat = "stock-data-v8", OutboundChannelExitedCallback = () => _waitHandle.Set(), #if DEBUG diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs index ef2b2f0..087918d 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs @@ -16,7 +16,6 @@ public class BulkRequestCreationForDataStreamBenchmarks private ITransport? _transport; private TransportConfiguration? _transportConfiguration; private StockData[] _data = Array.Empty(); - private readonly BulkOperationHeader _bulkOperationHeader = new CreateOperation(); public Stream MemoryStream { get; } = new MemoryStream(); @@ -40,12 +39,12 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader); - var postData = PostData.ReadOnlyMemory(bytes); - await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None); - } + //[Benchmark(Baseline = true)] + //public async Task WriteToStreamAsync() + //{ + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader); + // var postData = PostData.ReadOnlyMemory(bytes); + // await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None); + //} } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs index 1b54fab..2457266 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs @@ -39,12 +39,12 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true)); - var postData = PostData.ReadOnlyMemory(bytes); - await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None); - } + // [Benchmark(Baseline = true)] + // public async Task WriteToStreamAsync() + // { + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true)); + // var postData = PostData.ReadOnlyMemory(bytes); + // await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None); + // } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs index e9ea4d1..bd5cb3d 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs @@ -38,12 +38,12 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task DynamicIndexName_WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false)); - var postData = PostData.ReadOnlyMemory(bytes); - await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None); - } + // [Benchmark(Baseline = true)] + // public async Task DynamicIndexName_WriteToStreamAsync() + // { + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false)); + // var postData = PostData.ReadOnlyMemory(bytes); + // await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None); + // } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs index 5494e1e..2e7e9d4 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs @@ -25,7 +25,7 @@ var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks(); bm.Setup(); -await bm.WriteToStreamAsync(); +//await bm.WriteToStreamAsync(); var length = bm.MemoryStream.Length; diff --git a/src/Elastic.Channels/BufferedChannelBase.cs b/src/Elastic.Channels/BufferedChannelBase.cs index 09494ff..673b47f 100644 --- a/src/Elastic.Channels/BufferedChannelBase.cs +++ b/src/Elastic.Channels/BufferedChannelBase.cs @@ -123,26 +123,26 @@ public abstract class BufferedChannelBase internal InboundBuffer InboundBuffer { get; } /// - protected BufferedChannelBase(TChannelOptions options) : this(options, null) { } + protected BufferedChannelBase(TChannelOptions options, string diagnosticsName) : this(options, null, diagnosticsName) { } /// - protected BufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) + protected BufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) { TokenSource = options.CancellationToken.HasValue ? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value) : new CancellationTokenSource(); Options = options; - var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray(); + var listeners = callbackListeners == null ? [Options] : callbackListeners.Concat([Options]).ToArray(); DiagnosticsListener = listeners - .Select(l => (l is IChannelDiagnosticsListener c) ? c : null) - .FirstOrDefault(e => e != null); + .OfType() + .FirstOrDefault(); if (DiagnosticsListener == null && !options.DisableDiagnostics) { // if no debug listener was already provided but was requested explicitly create one. - var l = new ChannelDiagnosticsListener(GetType().Name); + var l = new ChannelDiagnosticsListener(diagnosticsName); DiagnosticsListener = l; - listeners = listeners.Concat(new[] { l }).ToArray(); + listeners = listeners.Concat([l]).ToArray(); } _callbacks = new ChannelCallbackInvoker(listeners); @@ -311,7 +311,11 @@ private async Task ConsumeOutboundEventsAsync() } } await Task.WhenAll(_taskList).ConfigureAwait(false); +#if NET8_0_OR_GREATER + await _exitCancelSource.CancelAsync().ConfigureAwait(false); +#else _exitCancelSource.Cancel(); +#endif _callbacks.OutboundChannelExitedCallback?.Invoke(); } @@ -443,7 +447,7 @@ async Task AsyncSlowPathAsync(IOutboundBuffer b) } /// > - public override string ToString() + public override string? ToString() { if (DiagnosticsListener == null) return base.ToString(); var sb = new StringBuilder(); diff --git a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs index 4c461bf..c2adaeb 100644 --- a/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs +++ b/src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs @@ -38,14 +38,14 @@ public class NoopChannelOptions : ChannelOptionsBase public NoopBufferedChannel( NoopChannelOptions options, ICollection>? channelListeners = null - ) : base(options, channelListeners) { } + ) : base(options, channelListeners, nameof(NoopBufferedChannel)) { } /// public NoopBufferedChannel( BufferOptions options, ICollection>? channelListeners = null, bool observeConcurrency = false - ) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners) + ) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners, nameof(NoopBufferedChannel)) { } diff --git a/src/Elastic.Channels/Elastic.Channels.csproj b/src/Elastic.Channels/Elastic.Channels.csproj index 0b4e675..48e3c2a 100644 --- a/src/Elastic.Channels/Elastic.Channels.csproj +++ b/src/Elastic.Channels/Elastic.Channels.csproj @@ -1,7 +1,7 @@ - netstandard2.0;netstandard2.1 + netstandard2.0;netstandard2.1;net8.0 Provides components to build a buffer-backed channel that flushes batches of data in a controlled (Max N || Max Duration) manner. elastic, channels, buffer latest diff --git a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs index 0208ff2..c90111e 100644 --- a/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs +++ b/src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs @@ -33,12 +33,12 @@ public abstract class ResponseItemsBufferedChannelBase - protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected ResponseItemsBufferedChannelBase(TChannelOptions options) - : base(options) { } + protected ResponseItemsBufferedChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// Based on should return a bool indicating if retry is needed protected abstract bool Retry(TResponse response); diff --git a/src/Elastic.Ingest.Apm/ApmChannel.cs b/src/Elastic.Ingest.Apm/ApmChannel.cs index d9125ba..761f72a 100644 --- a/src/Elastic.Ingest.Apm/ApmChannel.cs +++ b/src/Elastic.Ingest.Apm/ApmChannel.cs @@ -22,8 +22,8 @@ namespace Elastic.Ingest.Apm; /// public class ApmChannel : TransportChannelBase { -/// - public ApmChannel(ApmChannelOptions options) : base(options) { } + /// + public ApmChannel(ApmChannelOptions options) : base(options, nameof(ApmChannel)) { } //retry if APM server returns 429 /// diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index f8eb85e..eb20d91 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -14,24 +14,34 @@ namespace Elastic.Ingest.Elasticsearch.DataStreams; public class DataStreamChannel : ElasticsearchChannelBase> where TEvent : class { - private readonly CreateOperation _fixedHeader; private readonly string _url; /// public DataStreamChannel(DataStreamChannelOptions options) : this(options, null) { } /// - public DataStreamChannel(DataStreamChannelOptions options, ICollection>? callbackListeners) : base(options, callbackListeners) + public DataStreamChannel(DataStreamChannelOptions options, ICollection>? callbackListeners, string? diagnosticsName = null) + : base(options, callbackListeners, diagnosticsName ?? nameof(DataStreamChannel)) { var dataStream = Options.DataStream.ToString(); _url = $"{dataStream}/{base.BulkPathAndQuery}"; - - _fixedHeader = new CreateOperation(); } - /// - protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event) => _fixedHeader; + /// + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event) + { + var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event); + var templates = Options.DynamicTemplateLookup?.Invoke(@event); + if (templates is null && listExecutedPipelines is null or false) + return (HeaderSerializationStrategy.CreateNoParams, null); + var header = new BulkHeader + { + DynamicTemplates = templates, + ListExecutedPipelines = listExecutedPipelines + }; + return (HeaderSerializationStrategy.Create, header); + } /// protected override string TemplateName => Options.DataStream.GetTemplateName(); diff --git a/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj b/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj index 6e537ed..82d293b 100644 --- a/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj +++ b/src/Elastic.Ingest.Elasticsearch/Elastic.Ingest.Elasticsearch.csproj @@ -1,12 +1,13 @@ - netstandard2.0;netstandard2.1 + netstandard2.0;netstandard2.1;net8.0 Offers an easy to use ChannelWriter implementation to push data concurrently to Elasticsearch using Elastic.Transport elastic, channels, elasticsearch, ingest latest enable True + true diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs new file mode 100644 index 0000000..ed2585a --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -0,0 +1,90 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Ingest.Transport; +using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; + +namespace Elastic.Ingest.Elasticsearch; + +/// +/// An abstract base class for both and +/// Coordinates most of the sending to- and bootstrapping of Elasticsearch +/// +public abstract partial class ElasticsearchChannelBase + where TChannelOptions : ElasticsearchChannelOptionsBase + where TEvent : class +{ + /// TODO + protected abstract (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event); + + /// + /// Asynchronously write the NDJSON request body for a page of events to . + /// + /// A page of events. + /// The target for the request. + /// The for the channel where the request will be written. + /// The cancellation token to cancel operation. + /// + public async Task WriteBufferToStreamAsync( + ArraySegment page, Stream stream, ElasticsearchChannelOptionsBase options, CancellationToken ctx = default) + { +#if NETSTANDARD2_1_OR_GREATER + var items = page; +#else + // needs cast prior to netstandard2.0 + IReadOnlyList items = page; +#endif + // for is okay on ArraySegment, foreach performs bad: + // https://antao-almada.medium.com/how-to-use-span-t-and-memory-t-c0b126aae652 + // ReSharper disable once ForCanBeConvertedToForeach + for (var i = 0; i < items.Count; i++) + { + var @event = items[i]; + if (@event == null) continue; + + var (op, header) = EventIndexStrategy(@event); + switch (op) + { + case HeaderSerializationStrategy.IndexNoParams: + await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); + break; + case HeaderSerializationStrategy.CreateNoParams: + await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false); + break; + case HeaderSerializationStrategy.Index: + case HeaderSerializationStrategy.Create: + case HeaderSerializationStrategy.Delete: + case HeaderSerializationStrategy.Update: + await SerializeHeaderAsync(stream, op, ref header, ctx).ConfigureAwait(false); + break; + } + + await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); + + if (op == HeaderSerializationStrategy.Update) + await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false); + + if (options.EventWriter?.WriteToStreamAsync != null) + await options.EventWriter.WriteToStreamAsync(stream, @event, ctx).ConfigureAwait(false); + else + await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx) + .ConfigureAwait(false); + + if (op == HeaderSerializationStrategy.Update) + await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false); + + await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); + } + } +} diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs new file mode 100644 index 0000000..377f1bc --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -0,0 +1,353 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices.ComTypes; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; + +namespace Elastic.Ingest.Elasticsearch; + +/// +/// An abstract base class for both and +/// Coordinates most of the sending to- and bootstrapping of Elasticsearch +/// +public abstract partial class ElasticsearchChannelBase + where TChannelOptions : ElasticsearchChannelOptionsBase + where TEvent : class +{ +#if NET8_0_OR_GREATER + private static ReadOnlySpan IndexPrefixBytesSpan => """{"index":{"""u8; + private static ReadOnlySpan CreatePrefixBytesSpan => """{"create":{"""u8; + private static ReadOnlySpan DeletePrefixBytesSpan => """{"delete":{"""u8; + private static ReadOnlySpan UpdatePrefixBytesSpan => """{"update":{"""u8; + private static ReadOnlySpan SuffixBytesSpan => """}}"""u8; + + private static ReadOnlySpan IdPropertyPrefixBytesSpan => "\"_id\":\""u8; + private static ReadOnlySpan IndexPropertyPrefixBytesSpan => "\"_index\":\""u8; + private static ReadOnlySpan ListExecutedPipelinesPropertyPrefixBytesSpan => "\"list_executed_pipelines\":"u8; + private static ReadOnlySpan RequireAliasPropertyPrefixBytesSpan => "\"require_alias\":"u8; + private static ReadOnlySpan DyanamicTemplatesPropertyPrefixBytesSpan => "\"dynamic_templates\":"u8; + + private static ReadOnlySpan TrueBytesSpan => "true"u8; + private static ReadOnlySpan FalseBytesSpan => "false"u8; + + private static ReadOnlySpan DoubleQuote => [(byte)'"']; + private static ReadOnlySpan Comma => [(byte)',']; + + private static ReadOnlySpan OpenSquare => [(byte)'[']; + private static ReadOnlySpan CloseSquare => [(byte)']']; + private static ReadOnlySpan OpenCurlyBrace => [(byte)'{']; + private static ReadOnlySpan CloseCurlyBrace => [(byte)'}']; + private static ReadOnlySpan Colon => [(byte)':']; +#endif + + private static ReadOnlySpan PlainIndexBytesSpan => """{"index":{}}"""u8; + private static ReadOnlySpan PlainCreateBytesSpan => """{"create":{}}"""u8; + +#if NETSTANDARD + private static byte[] PlainIndexBytes => PlainIndexBytesSpan.ToArray(); + private static byte[] PlainCreateBytes => PlainCreateBytesSpan.ToArray(); + + private static readonly JsonEncodedText CreateOperation = JsonEncodedText.Encode("create"); + private static readonly JsonEncodedText UpdateOperation = JsonEncodedText.Encode("update"); + private static readonly JsonEncodedText IndexOperation = JsonEncodedText.Encode("index"); + private static readonly JsonEncodedText DeleteOperation = JsonEncodedText.Encode("delete"); + private static readonly JsonEncodedText IdProperty = JsonEncodedText.Encode("_id"); + private static readonly JsonEncodedText IndexProperty = JsonEncodedText.Encode("_index"); + private static readonly JsonEncodedText RequireAliasProperty = JsonEncodedText.Encode("require_alias"); + private static readonly JsonEncodedText ListExecutedPipelinesProperty = JsonEncodedText.Encode("list_executed_pipelines"); + private static readonly JsonEncodedText DynamicTemplatesProperty = JsonEncodedText.Encode("dynamic_templates"); +#endif + +#if NET8_0_OR_GREATER + [SkipLocalsInit] + private static ValueTask SerializeHeaderAsync(Stream stream, HeaderSerializationStrategy operation, ref readonly BulkHeader? header, CancellationToken ctx) + { + if (!header.HasValue) + { + switch (operation) + { + case HeaderSerializationStrategy.Index: + stream.Write(PlainIndexBytesSpan); + break; + case HeaderSerializationStrategy.Create: + stream.Write(PlainCreateBytesSpan); + break; + default: + throw new ArgumentException($"Expected non null value for {operation}.", nameof(header)); + } + + return ValueTask.CompletedTask; + } + + Span buffer = stackalloc byte[256]; + + switch (operation) + { + case HeaderSerializationStrategy.Index: + stream.Write(IndexPrefixBytesSpan); + break; + case HeaderSerializationStrategy.Create: + stream.Write(CreatePrefixBytesSpan); + break; + case HeaderSerializationStrategy.Delete: + stream.Write(DeletePrefixBytesSpan); + break; + case HeaderSerializationStrategy.Update: + stream.Write(UpdatePrefixBytesSpan); + break; + default: + throw new ArgumentException($"Unexpected operation {operation}."); + } + + var propertyCount = 0; + var headerValue = header.Value; + + if (!string.IsNullOrEmpty(headerValue.Index)) + WriteString(IndexPropertyPrefixBytesSpan, stream, headerValue.Index, buffer, ref propertyCount); + + if (!string.IsNullOrEmpty(headerValue.Id)) + WriteString(IdPropertyPrefixBytesSpan, stream, headerValue.Id, buffer, ref propertyCount); + + if (headerValue.RequireAlias.HasValue && headerValue.RequireAlias.Value) + { + WriteTrue(RequireAliasPropertyPrefixBytesSpan, stream, ref propertyCount); + } + else if (headerValue.RequireAlias.HasValue && !headerValue.RequireAlias.Value) + { + WriteFalse(RequireAliasPropertyPrefixBytesSpan, stream, ref propertyCount); + } + + if (headerValue.ListExecutedPipelines.HasValue && headerValue.ListExecutedPipelines.Value) + { + WriteTrue(ListExecutedPipelinesPropertyPrefixBytesSpan, stream, ref propertyCount); + } + else if (headerValue.ListExecutedPipelines.HasValue && !headerValue.ListExecutedPipelines.Value) + { + WriteFalse(ListExecutedPipelinesPropertyPrefixBytesSpan, stream, ref propertyCount); + } + + if (headerValue.DynamicTemplates is not null && headerValue.DynamicTemplates.Count > 0) + { + if (propertyCount > 0) + stream.Write(Comma); + + stream.Write(DyanamicTemplatesPropertyPrefixBytesSpan); + stream.Write(OpenSquare); + + var entryCount = 0; + foreach (var (key, value) in headerValue.DynamicTemplates) + { + WriteDictionaryEntry(stream, key, value, buffer, ref entryCount); + } + + stream.Write(CloseSquare); + propertyCount++; + } + + static void WriteString(ReadOnlySpan propertyNamePrefix, Stream stream, string value, Span buffer, ref int propertyCount) + { + if (propertyCount > 0) + stream.Write(Comma); + + stream.Write(propertyNamePrefix); // This includes the open quotes for the value + + var length = Encoding.UTF8.GetByteCount(value); + + if (length <= 256) + { + Encoding.UTF8.TryGetBytes(value, buffer, out var written); + stream.Write(buffer[..written]); + } + else + { + var rentedArray = ArrayPool.Shared.Rent(length); + Encoding.UTF8.TryGetBytes(value, rentedArray.AsSpan(), out var written); + stream.Write(buffer[..written]); + ArrayPool.Shared.Return(rentedArray); + } + + stream.Write(DoubleQuote); + propertyCount++; + } + + static void WriteTrue(ReadOnlySpan propertyNamePrefix, Stream stream, ref int propertyCount) + { + if (propertyCount > 0) + stream.Write(Comma); + + stream.Write(propertyNamePrefix); + stream.Write(TrueBytesSpan); + + propertyCount++; + } + + static void WriteFalse(ReadOnlySpan propertyNamePrefix, Stream stream, ref int propertyCount) + { + if (propertyCount > 0) + stream.Write(Comma); + + stream.Write(propertyNamePrefix); + stream.Write(FalseBytesSpan); + + propertyCount++; + } + + static void WriteDictionaryEntry(Stream stream, string key, string value, Span buffer, ref int entryCount) + { + if (string.IsNullOrEmpty(key) || string.IsNullOrEmpty(value) || key.Length == 0 || value.Length == 0) + return; + + if (entryCount > 0) + stream.Write(Comma); + + stream.Write(OpenCurlyBrace); + WriteQuotedStringBytes(stream, key, buffer); + stream.Write(Colon); + WriteQuotedStringBytes(stream, value, buffer); + stream.Write(CloseCurlyBrace); + + entryCount++; + } + + static void WriteQuotedStringBytes(Stream stream, string value, Span buffer) + { + stream.Write(DoubleQuote); + var length = Encoding.UTF8.GetByteCount(value); + if (length <= 256) + { + Encoding.UTF8.TryGetBytes(value, buffer, out var written); + stream.Write(buffer[..written]); + } + else + { + var rentedArray = ArrayPool.Shared.Rent(length); + Encoding.UTF8.TryGetBytes(value, rentedArray.AsSpan(), out var written); + stream.Write(buffer[..written]); + ArrayPool.Shared.Return(rentedArray); + } + stream.Write(DoubleQuote); + } + + stream.Write(SuffixBytesSpan); + + return ValueTask.CompletedTask; + } +#else + private static Task SerializeHeaderAsync(Stream stream, HeaderSerializationStrategy operation, ref readonly BulkHeader? header, CancellationToken ctx) + { + if (!header.HasValue) + { + if (operation != HeaderSerializationStrategy.Index || operation != HeaderSerializationStrategy.Create) + throw new ArgumentException($"Expected non null value for {operation}.", nameof(header)); + + return HandleNullBulkHeaderAsync(stream, operation); + } + + var operationString = operation switch + { + HeaderSerializationStrategy.Create => CreateOperation, + HeaderSerializationStrategy.Delete => DeleteOperation, + HeaderSerializationStrategy.Index => IndexOperation, + HeaderSerializationStrategy.Update => UpdateOperation, + HeaderSerializationStrategy.IndexNoParams => throw new InvalidOperationException(), + HeaderSerializationStrategy.CreateNoParams => throw new InvalidOperationException(), + _ => throw new ArgumentOutOfRangeException(nameof(operation), operation, null) + }; + + var headerValue = header.Value; + + return SerializeHeaderAsync(stream, operationString, headerValue.Id, headerValue.Index, headerValue.RequireAlias, + headerValue.ListExecutedPipelines, headerValue.DynamicTemplates, ctx); + } + + private static async Task HandleNullBulkHeaderAsync(Stream stream, HeaderSerializationStrategy operation) + { + switch (operation) + { + case HeaderSerializationStrategy.Index: + await stream.WriteAsync(PlainIndexBytes, 0, PlainCreateBytes.Length).ConfigureAwait(false); + break; + case HeaderSerializationStrategy.Create: + await stream.WriteAsync(PlainCreateBytes, 0, PlainCreateBytes.Length).ConfigureAwait(false); + break; + } + } + + private static async Task SerializeHeaderAsync(Stream stream, JsonEncodedText operation, string? id, string? index, bool? requireAlias, + bool? listExecutedPipelines, IDictionary? dynamicTemplates, CancellationToken ctx) + { + var writer = new Utf8JsonWriter(stream, default); + + await using (writer.ConfigureAwait(false)) + { + writer.WriteStartObject(); + writer.WritePropertyName(operation); + writer.WriteStartObject(); + + if (!string.IsNullOrWhiteSpace(index)) + writer.WriteString(IndexProperty, index); + + if (!string.IsNullOrWhiteSpace(id)) + writer.WriteString(IdProperty, id); + + if (requireAlias.HasValue) + writer.WriteBoolean(RequireAliasProperty, requireAlias.Value); + + if (listExecutedPipelines.HasValue) + writer.WriteBoolean(ListExecutedPipelinesProperty, listExecutedPipelines.Value); + + if (dynamicTemplates is not null) + { + writer.WritePropertyName(DynamicTemplatesProperty); + writer.WriteStartArray(); + + foreach (var template in dynamicTemplates) + { + writer.WriteStartObject(); + writer.WriteString(template.Key, template.Value); + writer.WriteEndObject(); + } + + writer.WriteEndArray(); + } + + writer.WriteEndObject(); + writer.WriteEndObject(); + } + } +#endif + +#if NET8_0_OR_GREATER + private static ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx = default) + { + stream.Write(PlainIndexBytesSpan); + return ValueTask.CompletedTask; + } +#else + private static async ValueTask SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) => + await stream.WriteAsync(PlainIndexBytes, 0, PlainIndexBytes.Length, ctx).ConfigureAwait(false); +#endif + +#if NET8_0_OR_GREATER + private static ValueTask SerializePlainCreateHeaderAsync(Stream stream, CancellationToken ctx = default) + { + stream.Write(PlainCreateBytesSpan); + return ValueTask.CompletedTask; + } +#else + private static async ValueTask SerializePlainCreateHeaderAsync(Stream stream, CancellationToken ctx) => + await stream.WriteAsync(PlainCreateBytes, 0, PlainCreateBytes.Length, ctx).ConfigureAwait(false); +#endif +} diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs index 68adfcc..ffe2c6f 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs @@ -29,12 +29,12 @@ public abstract partial class ElasticsearchChannelBase where TEvent : class { /// - protected ElasticsearchChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected ElasticsearchChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected ElasticsearchChannelBase(TChannelOptions options) - : base(options) { } + protected ElasticsearchChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// protected override bool Retry(BulkResponse response) @@ -69,39 +69,22 @@ protected override bool RejectEvent((TEvent, BulkResponseItem) @event) => protected override Task ExportAsync(ITransport transport, ArraySegment page, CancellationToken ctx = default) { ctx = ctx == default ? TokenSource.Token : ctx; -#if NETSTANDARD2_1 - // Option is obsolete to prevent external users to set it. -#pragma warning disable CS0618 - if (Options.UseReadOnlyMemory) -#pragma warning restore CS0618 - { - var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader); - return transport.RequestAsync(HttpMethod.POST, BulkPathAndQuery, PostData.ReadOnlyMemory(bytes), ctx); - } -#endif -#pragma warning disable IDE0022 // Use expression body for method - return transport.RequestAsync(new (HttpMethod.POST, BulkPathAndQuery), + return transport.RequestAsync(HttpMethod.POST, BulkPathAndQuery, PostData.StreamHandler(page, (_, _) => { - /* NOT USED */ + /* Synchronous code path never called */ }, - async (b, stream, ctx) => { await BulkRequestDataFactory.WriteBufferToStreamAsync(b, stream, Options, CreateBulkOperationHeader, ctx).ConfigureAwait(false); }) + async (b, stream, t) => await WriteBufferToStreamAsync(b, stream, Options, t).ConfigureAwait(false)) , ctx); -#pragma warning restore IDE0022 // Use expression body for method } - /// - /// Asks implementations to create a based on the being exported. - /// - protected abstract BulkOperationHeader CreateBulkOperationHeader(TEvent @event); - /// - protected class HeadIndexTemplateResponse : ElasticsearchResponse { } + protected class HeadIndexTemplateResponse : ElasticsearchResponse; /// - protected class PutIndexTemplateResponse : ElasticsearchResponse { } + protected class PutIndexTemplateResponse : ElasticsearchResponse; /// - protected class PutComponentTemplateResponse : ElasticsearchResponse { } + protected class PutComponentTemplateResponse : ElasticsearchResponse; } diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs index 6cc6d67..3d3b52b 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelOptionsBase.cs @@ -3,6 +3,8 @@ // See the LICENSE file in the project root for more information using System; +using System.Collections; +using System.Collections.Generic; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Ingest.Transport; using Elastic.Transport; @@ -22,18 +24,10 @@ protected ElasticsearchChannelOptionsBase(ITransport transport) : base(transport /// public IElasticsearchEventWriter? EventWriter { get; set; } - #if NETSTANDARD2_1_OR_GREATER - /// - /// Expert option, - /// This will eagerly serialize to and use . - /// If false (default) the channel will use to directly write to the stream. - /// - #else - /// - /// Expert option, only available in netstandard2.1+ compatible runtimes to evaluate serialization approaches - /// - #endif - [Obsolete("Temporary exposed expert option, used to evaluate two different approaches to serialization")] - public bool UseReadOnlyMemory { get; set; } + /// Optionally set dynamic templates for event + public Func?>? DynamicTemplateLookup { get; set; } + + /// If true, the response will include the ingest pipelines that were executed. Defaults to false. + public Func? ListExecutedPipelines { get; set; } } diff --git a/src/Elastic.Ingest.Elasticsearch/IElasticsearchEventWriter.cs b/src/Elastic.Ingest.Elasticsearch/IElasticsearchEventWriter.cs index cf1eadc..5525605 100644 --- a/src/Elastic.Ingest.Elasticsearch/IElasticsearchEventWriter.cs +++ b/src/Elastic.Ingest.Elasticsearch/IElasticsearchEventWriter.cs @@ -16,16 +16,6 @@ namespace Elastic.Ingest.Elasticsearch; /// public interface IElasticsearchEventWriter { -#if NETSTANDARD2_1_OR_GREATER - /// - /// Provide a custom routine to write to an ArrayBufferWriter{T} - /// This implementation is only called if is true, defaults to false - /// Otherwise is called instead. - /// If `null` will fallback to its own internal implementation. - /// - Action, TEvent>? WriteToArrayBuffer { get; set; } -#endif - /// /// Provide a custom routine to write to a asynchronously /// If `null` will fallback to its internal implementation. diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index e4dc649..9e202c5 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -7,6 +7,7 @@ using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Ingest.Transport; +using static Elastic.Ingest.Elasticsearch.Serialization.HeaderSerializationStrategy; namespace Elastic.Ingest.Elasticsearch.Indices; @@ -23,7 +24,8 @@ public class IndexChannel : ElasticsearchChannelBase options) : this(options, null) { } /// - public IndexChannel(IndexChannelOptions options, ICollection>? callbackListeners) : base(options, callbackListeners) + public IndexChannel(IndexChannelOptions options, ICollection>? callbackListeners, string? diagnosticsName = null) + : base(options, callbackListeners, diagnosticsName ?? nameof(IndexChannel)) { _url = base.BulkPathAndQuery; @@ -42,8 +44,44 @@ public IndexChannel(IndexChannelOptions options, ICollection protected override string BulkPathAndQuery => _url; - /// - protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event) => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(@event, Options, _skipIndexNameOnOperations); + /// + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event) + { + var indexTime = Options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; + if (Options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(Options.IndexOffset.Value); + + var index = _skipIndexNameOnOperations ? string.Empty : string.Format(Options.IndexFormat, indexTime); + var id = Options.BulkOperationIdLookup?.Invoke(@event); + var templates = Options.DynamicTemplateLookup?.Invoke(@event); + var requireAlias = Options.RequireAlias?.Invoke(@event); + var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event); + var isUpsert = Options.BulkUpsertLookup?.Invoke(@event, index) is true; + if (string.IsNullOrWhiteSpace(index) + && string.IsNullOrWhiteSpace(id) + && templates is null + && isUpsert is false + && requireAlias is null or false + && listExecutedPipelines is null or false) + return Options.OperationMode == OperationMode.Index + ? (IndexNoParams, null) + : (CreateNoParams, null); + + var header = new BulkHeader + { + Id = id, + Index = index, + DynamicTemplates = templates, + RequireAlias = requireAlias, + ListExecutedPipelines = listExecutedPipelines + }; + var op = Options.OperationMode == OperationMode.Index + ? HeaderSerializationStrategy.Index + : Create; + if (isUpsert) + op = Update; + + return (op, header); + } /// protected override string TemplateName { get; } diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs index ddc9ff1..c0758e2 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs @@ -43,6 +43,9 @@ public IndexChannelOptions(ITransport transport) : base(transport) { } /// public Func? BulkOperationIdLookup { get; set; } + /// If true, the action must target an index alias. Defaults to false. + public Func? RequireAlias { get; set; } + /// /// Uses the callback provided to to determine if this is in fact an update operation /// If this returns true the document will be sent as an upsert operation diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs new file mode 100644 index 0000000..7cada19 --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkHeader.cs @@ -0,0 +1,31 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Collections.Generic; +using Elastic.Ingest.Elasticsearch.DataStreams; + +namespace Elastic.Ingest.Elasticsearch.Serialization; + +/// TODO +public readonly struct BulkHeader +{ + /// The index to write to, never set when writing using + public string? Index { get; init; } + + /// The id of the object being written, never set when writing using + public string? Id { get; init; } + + /// Require to point to an alias, never set when writing using + public bool? RequireAlias { get; init; } + + /// + /// A map from the full name of fields to the name of dynamic templates. Defaults to an empty map. If a name matches a dynamic template, + /// then that template will be applied regardless of other match predicates defined in the template. And if a field is already defined + /// in the mapping, then this parameter won't be used. + /// + public IDictionary? DynamicTemplates { get; init; } + + /// If true, the response will include the ingest pipelines that were executed. Defaults to false. + public bool? ListExecutedPipelines { get; init; } +} diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs deleted file mode 100644 index b5dd325..0000000 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs +++ /dev/null @@ -1,98 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information -using System; -using System.Collections.Generic; -using System.Text.Json; -using System.Text.Json.Serialization; - -namespace Elastic.Ingest.Elasticsearch.Serialization; - -/// Represents the _bulk operation meta header -public abstract class BulkOperationHeader -{ - /// The index or data stream to write to - [JsonPropertyName("_index")] - public string? Index { get; init; } - - /// The id of the object being written - [JsonPropertyName("_id")] - public string? Id { get; init; } - - /// Require to point to an alias - [JsonPropertyName("require_alias")] - public bool? RequireAlias { get; init; } -} - -/// Represents the _bulk create operation meta header -[JsonConverter(typeof(BulkOperationHeaderConverter))] -public class CreateOperation : BulkOperationHeader -{ - /// - [JsonPropertyName("dynamic_templates")] - public Dictionary? DynamicTemplates { get; init; } -} - -/// Represents the _bulk index operation meta header -[JsonConverter(typeof(BulkOperationHeaderConverter))] -public class IndexOperation : BulkOperationHeader -{ - /// - [JsonPropertyName("dynamic_templates")] - public Dictionary? DynamicTemplates { get; init; } -} - -/// Represents the _bulk delete operation meta header -[JsonConverter(typeof(BulkOperationHeaderConverter))] -public class DeleteOperation : BulkOperationHeader -{ -} - -/// Represents the _bulk update operation meta header -[JsonConverter(typeof(BulkOperationHeaderConverter))] -public class UpdateOperation : BulkOperationHeader -{ -} - -internal class BulkOperationHeaderConverter : JsonConverter - where THeader : BulkOperationHeader -{ - public override THeader Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => - throw new NotImplementedException(); - - public override void Write(Utf8JsonWriter writer, THeader value, JsonSerializerOptions options) - { - var op = value switch - { - CreateOperation _ => "create", - DeleteOperation _ => "delete", - IndexOperation _ => "index", - UpdateOperation _ => "update", - _ => throw new ArgumentOutOfRangeException(nameof(value), value, null) - }; - writer.WriteStartObject(); - writer.WritePropertyName(op); - writer.WriteStartObject(); - if (!string.IsNullOrWhiteSpace(value.Index)) - writer.WriteString("_index", value.Index); - if (!string.IsNullOrWhiteSpace(value.Id)) - writer.WriteString("_id", value.Id); - if (value.RequireAlias == true) - writer.WriteBoolean("require_alias", true); - if (value is CreateOperation c) - WriteDynamicTemplates(writer, options, c.DynamicTemplates); - if (value is IndexOperation i) - WriteDynamicTemplates(writer, options, i.DynamicTemplates); - - writer.WriteEndObject(); - writer.WriteEndObject(); - } - - private static void WriteDynamicTemplates(Utf8JsonWriter writer, JsonSerializerOptions options, Dictionary? templates) - { - if (templates is not { Count: > 0 }) return; - - writer.WritePropertyName("dynamic_templates"); - JsonSerializer.Serialize(writer, templates, options); - } -} diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs deleted file mode 100644 index 8f00950..0000000 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs +++ /dev/null @@ -1,164 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -#if NETSTANDARD2_1_OR_GREATER -using System.Buffers; -#else -using System.Collections.Generic; -#endif -using System; -using System.IO; -using System.Text.Json; -using System.Threading; -using System.Threading.Tasks; -using Elastic.Ingest.Elasticsearch.Indices; -using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; - -namespace Elastic.Ingest.Elasticsearch.Serialization; - -/// -/// Provides static factory methods from producing request data for bulk requests. -/// -public static class BulkRequestDataFactory -{ -#if NETSTANDARD2_1_OR_GREATER - /// - /// Get the NDJSON request body bytes for a page of events. - /// - /// The type for the event being ingested. - /// A page of events. - /// The for the channel where the request will be written. - /// A function which takes an instance of and produces the operation header containing the action and optional meta data. - /// A of representing the entire request body in NDJSON format. - public static ReadOnlyMemory GetBytes(ArraySegment page, - ElasticsearchChannelOptionsBase options, Func createHeaderFactory) - { - // ArrayBufferWriter inserts comma's when serializing multiple times - // Hence the multiple writer.Resets() as advised on this feature request - // https://github.com/dotnet/runtime/issues/82314 - var bufferWriter = new ArrayBufferWriter(); - using var writer = new Utf8JsonWriter(bufferWriter, WriterOptions); - foreach (var @event in page.AsSpan()) - { - var indexHeader = createHeaderFactory(@event); - JsonSerializer.Serialize(writer, indexHeader, indexHeader.GetType(), SerializerOptions); - bufferWriter.Write(LineFeed); - writer.Reset(); - - if (indexHeader is UpdateOperation) - { - bufferWriter.Write(DocUpdateHeaderStart); - writer.Reset(); - } - - if (options.EventWriter?.WriteToArrayBuffer != null) - options.EventWriter.WriteToArrayBuffer(bufferWriter, @event); - else - JsonSerializer.Serialize(writer, @event, SerializerOptions); - writer.Reset(); - - if (indexHeader is UpdateOperation) - { - bufferWriter.Write(DocUpdateHeaderEnd); - writer.Reset(); - } - - bufferWriter.Write(LineFeed); - writer.Reset(); - } - return bufferWriter.WrittenMemory; - } -#endif - - /// - /// Asynchronously write the NDJSON request body for a page of events to . - /// - /// The type for the event being ingested. - /// A page of events. - /// The target for the request. - /// The for the channel where the request will be written. - /// A function which takes an instance of and produces the operation header containing the action and optional meta data. - /// The cancellation token to cancel operation. - /// - public static async Task WriteBufferToStreamAsync(ArraySegment page, Stream stream, - ElasticsearchChannelOptionsBase options, Func createHeaderFactory, - CancellationToken ctx = default) - { -#if NETSTANDARD2_1_OR_GREATER - var items = page; -#else - // needs cast prior to netstandard2.0 - IReadOnlyList items = page; -#endif - // for is okay on ArraySegment, foreach performs bad: - // https://antao-almada.medium.com/how-to-use-span-t-and-memory-t-c0b126aae652 - // ReSharper disable once ForCanBeConvertedToForeach - for (var i = 0; i < items.Count; i++) - { - var @event = items[i]; - if (@event == null) continue; - - var indexHeader = createHeaderFactory(@event); - await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), SerializerOptions, ctx) - .ConfigureAwait(false); - await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); - - if (indexHeader is UpdateOperation) - await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false); - - if (options.EventWriter?.WriteToStreamAsync != null) - await options.EventWriter.WriteToStreamAsync(stream, @event, ctx).ConfigureAwait(false); - else - await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx) - .ConfigureAwait(false); - - if (indexHeader is UpdateOperation) - await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false); - - await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); - } - } - - /// - /// Create the bulk operation header with the appropriate action and meta data for a bulk request targeting an index. - /// - /// The type for the event being ingested. - /// The for which the header will be produced. - /// The for the channel. - /// Control whether the index name is included in the meta data for the operation. - /// A instance. - public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEvent @event, IndexChannelOptions options, - bool skipIndexName = false) - { - var indexTime = options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; - if (options.IndexOffset.HasValue) indexTime = indexTime.ToOffset(options.IndexOffset.Value); - - var index = skipIndexName ? string.Empty : string.Format(options.IndexFormat, indexTime); - - var id = options.BulkOperationIdLookup?.Invoke(@event); - - if (options.OperationMode == OperationMode.Index) - { - return skipIndexName - ? !string.IsNullOrWhiteSpace(id) ? new IndexOperation { Id = id } : new IndexOperation() - : !string.IsNullOrWhiteSpace(id) ? new IndexOperation { Index = index, Id = id } : new IndexOperation { Index = index }; - } - - if (options.OperationMode == OperationMode.Create) - { - return skipIndexName - ? !string.IsNullOrWhiteSpace(id) ? new CreateOperation { Id = id } : new CreateOperation() - : !string.IsNullOrWhiteSpace(id) ? new CreateOperation { Index = index, Id = id } : new CreateOperation { Index = index }; - } - - if (!string.IsNullOrWhiteSpace(id) && id != null && (options.BulkUpsertLookup?.Invoke(@event, id) ?? false)) - return skipIndexName ? new UpdateOperation { Id = id } : new UpdateOperation { Id = id, Index = index }; - - return - !string.IsNullOrWhiteSpace(id) - ? skipIndexName ? new IndexOperation { Id = id } : new IndexOperation { Index = index, Id = id } - : skipIndexName ? new CreateOperation() : new CreateOperation { Index = index }; - } -} - diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs new file mode 100644 index 0000000..f90a1d7 --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/HeaderSerialization.cs @@ -0,0 +1,22 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +namespace Elastic.Ingest.Elasticsearch.Serialization; + +/// TODO +public enum HeaderSerializationStrategy +{ + /// + Index, + /// + IndexNoParams, + /// + Create, + /// + CreateNoParams, + /// + Delete, + /// + Update, +} diff --git a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs index 2f5070d..13a4123 100644 --- a/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs +++ b/src/Elastic.Ingest.OpenTelemetry/CustomOtlpTraceExporter.cs @@ -63,7 +63,7 @@ public TraceChannel(TraceChannelOptions options) : this(options, null) { } /// public TraceChannel(TraceChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { + : base(options, callbackListeners, nameof(TraceChannel)) { var o = new OtlpExporterOptions { Endpoint = options.Endpoint ?? new Uri("http://localhost:4317"), diff --git a/src/Elastic.Ingest.Transport/TransportChannelBase.cs b/src/Elastic.Ingest.Transport/TransportChannelBase.cs index 4c39cc1..5ae5c19 100644 --- a/src/Elastic.Ingest.Transport/TransportChannelBase.cs +++ b/src/Elastic.Ingest.Transport/TransportChannelBase.cs @@ -24,12 +24,12 @@ public abstract class TransportChannelBase - protected TransportChannelBase(TChannelOptions options, ICollection>? callbackListeners) - : base(options, callbackListeners) { } + protected TransportChannelBase(TChannelOptions options, ICollection>? callbackListeners, string diagnosticsName) + : base(options, callbackListeners, diagnosticsName) { } /// - protected TransportChannelBase(TChannelOptions options) - : base(options) { } + protected TransportChannelBase(TChannelOptions options, string diagnosticsName) + : base(options, diagnosticsName) { } /// Implement sending the current of the buffer to the output. /// diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs index c8aedc6..2e239ef 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/DataStreamIngestionTests.cs @@ -28,11 +28,8 @@ public async Task EnsureDocumentsEndUpInDataStream() var slim = new CountdownEvent(1); var options = new DataStreamChannelOptions(Client.Transport) { -#pragma warning disable CS0618 - UseReadOnlyMemory = true, -#pragma warning restore CS0618 DataStream = targetDataStream, - BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 } + BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 }, }; var channel = new DataStreamChannel(options); diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs index f1dc0e5..3f7987e 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/DataStreamChannelTests.cs @@ -2,6 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System; using System.IO; using System.Threading; using Elastic.Ingest.Elasticsearch.DataStreams; @@ -20,6 +21,7 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() var wait = new ManualResetEvent(false); + Exception exception = null; using var channel = new DataStreamChannel(new DataStreamChannelOptions(Transport) { BufferOptions = new() @@ -27,6 +29,11 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() OutboundBufferMaxSize = 1 }, DataStream = new("type"), + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -35,7 +42,9 @@ public void DataStreamChannel_UsesCorrectUrlAndOperationHeader() }); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Uri.AbsolutePath.Should().Be("/type-generic-default/_bulk"); diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs index c44a8c7..68aae42 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/IndexChannelTests.cs @@ -3,6 +3,8 @@ // See the LICENSE file in the project root for more information using System; +using System.Collections; +using System.Collections.Generic; using System.IO; using System.Threading; using Elastic.Ingest.Elasticsearch.Indices; @@ -22,18 +24,35 @@ public void IndexChannel_WithFixedIndexName_UsesCorrectUrlAndOperationHeader() = public void IndexChannel_WithDynamicIndexName_UsesCorrectUrlAndOperationHeader() => ExecuteAndAssert("/_bulk", "{\"create\":{\"_index\":\"dotnet-2023.07.29\"}}"); - private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, string indexName = null) + [Fact] + public void IndexChannel_UsesOperationAndId() => + ExecuteAndAssert("/_bulk", "{\"index\":{\"_index\":\"dotnet-2023.07.29\",\"_id\":\"mydocid\"}}", id: "mydocid", operationMode: OperationMode.Index); + + [Fact] + public void IndexChannel_WritesCorrectHeaderWithAllOptions() => + ExecuteAndAssert("/fixed-index/_bulk", "{\"create\":{\"_id\":\"mydocid\",\"require_alias\":true,\"list_executed_pipelines\":true,\"dynamic_templates\":[{\"key1\":\"value1\"}]}}", "fixed-index", id: "mydocid", operationMode: OperationMode.Create, + requiresAlias: true, listExecutedPipelines: true, dynamicTemplates: new Dictionary { { "key1", "value1"} }); + + private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader, + string indexName = null, string id = null, OperationMode? operationMode = null, bool? requiresAlias = null, + bool? listExecutedPipelines = null, IDictionary dynamicTemplates = null) { ApiCallDetails callDetails = null; var wait = new ManualResetEvent(false); + Exception exception = null; var options = new IndexChannelOptions(Transport) { BufferOptions = new() { OutboundBufferMaxSize = 1 }, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -42,6 +61,21 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader TimestampLookup = _ => new DateTimeOffset(2023, 07, 29, 20, 00, 00, TimeSpan.Zero), }; + if (operationMode.HasValue) + options.OperationMode = operationMode.Value; + + if (!string.IsNullOrEmpty(id)) + options.BulkOperationIdLookup = _ => id; + + if (requiresAlias.HasValue) + options.RequireAlias = _ => requiresAlias.Value; + + if (dynamicTemplates is not null) + options.DynamicTemplateLookup = _ => dynamicTemplates; + + if (listExecutedPipelines.HasValue) + options.ListExecutedPipelines = _ => listExecutedPipelines.Value; + if (indexName is not null) { options.IndexFormat = indexName; @@ -50,7 +84,9 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader using var channel = new IndexChannel(options); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Should().NotBeNull(); callDetails.Uri.AbsolutePath.Should().Be(expectedUrl); diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs index 9272497..764830d 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Setup.cs @@ -59,9 +59,6 @@ public TestSession(DistributedTransport transport) ChannelOptions = new IndexChannelOptions(transport) { BufferOptions = BufferOptions, -#pragma warning disable CS0618 - UseReadOnlyMemory = true, -#pragma warning restore CS0618 ServerRejectionCallback = (_) => Interlocked.Increment(ref _rejections), ExportItemsAttemptCallback = (_, _) => Interlocked.Increment(ref _requests), ExportResponseCallback = (_, _) => Interlocked.Increment(ref _responses), diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs new file mode 100644 index 0000000..40356b1 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/DataStreamChannelEventStrategyTests.cs @@ -0,0 +1,87 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Transport; +using FluentAssertions; +using Xunit; + +namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; + +internal class TestDataStreamChannel(DataStreamChannelOptions options) + : DataStreamChannel(options) +{ + public List Strategies { get; } = new(); + + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(DataStreamDocument @event) + { + var strategy = base.EventIndexStrategy(@event); + Strategies.Add(new TrackStrategy + { + Id = @event.Id, + Strategy = strategy.Item1, + Header = strategy.Item2 + }); + return strategy; + } +} + +public class DataStreamChannelEventStrategyTests : ChannelTestWithSingleDocResponseBase +{ + [Fact] + public void EmitsExpectedStrategies() + { + ApiCallDetails callDetails = null; + + var wait = new ManualResetEvent(false); + + Exception exception = null; + using var channel = new TestDataStreamChannel(new DataStreamChannelOptions(Transport) + { + BufferOptions = new() { OutboundBufferMaxSize = 3 }, + DynamicTemplateLookup = document => document.Id == 2 ? new Dictionary { { "id", "1" } } : null, + ListExecutedPipelines = document => document.Id == 3, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, + ExportResponseCallback = (response, _) => + { + callDetails = response.ApiCallDetails; + wait.Set(); + } + }); + + channel.TryWrite(new DataStreamDocument()); //0 + channel.TryWrite(new DataStreamDocument()); //1 + channel.TryWrite(new DataStreamDocument()); //2 + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); + + callDetails.Uri.AbsolutePath.Should().Be("/datastreamdocument-generic-default/_bulk"); + + + channel.Strategies.Should().HaveCount(3); + var strategies = channel.Strategies.OrderBy(s => s.Id).ToList(); + + strategies[0].Strategy.Should().Be(HeaderSerializationStrategy.CreateNoParams); + strategies[0].Header.Should().BeNull(); + + strategies[1].Strategy.Should().Be(HeaderSerializationStrategy.Create); + strategies[1].Header.Should().NotBeNull(); + strategies[1].Header!.Value.DynamicTemplates.Should().NotBeNull(); + + strategies[2].Strategy.Should().Be(HeaderSerializationStrategy.Create); + strategies[2].Header.Should().NotBeNull(); + strategies[2].Header!.Value.ListExecutedPipelines.Should().BeTrue(); + + } +} diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventStrategyTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventStrategyTests.cs new file mode 100644 index 0000000..d2d69a0 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/IndexChannelEventStrategyTests.cs @@ -0,0 +1,99 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Transport; +using FluentAssertions; +using Xunit; + +namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; + +internal class TestIndexChannel(IndexChannelOptions options) : IndexChannel(options) +{ + public List Strategies { get; } = new(); + + protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(IndexDocument @event) + { + var strategy = base.EventIndexStrategy(@event); + Strategies.Add(new TrackStrategy + { + Id = @event.Id, + Strategy = strategy.Item1, + Header = strategy.Item2 + }); + return strategy; + } +} + +public class IndexChannelEventStrategyTests : ChannelTestWithSingleDocResponseBase +{ + [Fact] + public void EmitsExpectedStrategies() + { + ApiCallDetails callDetails = null; + + var wait = new ManualResetEvent(false); + + Exception exception = null; + using var channel = new TestIndexChannel(new IndexChannelOptions(Transport) + { + IndexFormat = "test-index", + BufferOptions = new() { OutboundBufferMaxSize = 6 }, + DynamicTemplateLookup = document => document.Id == 2 ? new Dictionary { { "id", "1" } } : null, + BulkOperationIdLookup = document => document.Id == 3 ? "33" : null, + BulkUpsertLookup = (document, _) => document.Id == 4, + RequireAlias = document => document.Id == 5, + ListExecutedPipelines = document => document.Id == 6, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, + ExportResponseCallback = (response, _) => + { + callDetails = response.ApiCallDetails; + wait.Set(); + } + }); + + channel.TryWrite(new IndexDocument()); //0 + channel.TryWrite(new IndexDocument()); //1 + channel.TryWrite(new IndexDocument()); //2 + channel.TryWrite(new IndexDocument()); //3 + channel.TryWrite(new IndexDocument()); //4 + channel.TryWrite(new IndexDocument()); //5 + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); + + callDetails.Uri.AbsolutePath.Should().Be("/test-index/_bulk"); + + channel.Strategies.Should().HaveCount(6); + var strategies = channel.Strategies.OrderBy(s => s.Id).ToList(); + + strategies[0].Id.Should().Be(1); + strategies[0].Strategy.Should().Be(HeaderSerializationStrategy.CreateNoParams); + strategies[0].Header.Should().BeNull(); + + strategies[1].Strategy.Should().Be(HeaderSerializationStrategy.Create); + strategies[1].Header.Should().NotBeNull(); + strategies[1].Header!.Value.DynamicTemplates.Should().NotBeNull(); + + strategies[2].Strategy.Should().Be(HeaderSerializationStrategy.Create); + strategies[2].Header.Should().NotBeNull(); + strategies[2].Header!.Value.Id.Should().Be("33"); + + strategies[3].Strategy.Should().Be(HeaderSerializationStrategy.Update); + + strategies[4].Header!.Value.RequireAlias.Should().BeTrue(); + + strategies[5].Header!.Value.ListExecutedPipelines.Should().BeTrue(); + + } +} diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/TrackDocument.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/TrackDocument.cs new file mode 100644 index 0000000..aab0029 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Strategies/TrackDocument.cs @@ -0,0 +1,31 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using Elastic.Ingest.Elasticsearch.Serialization; + +namespace Elastic.Ingest.Elasticsearch.Tests.Strategies; + +public class IndexDocument +{ + private static int Counter; + + public DateTimeOffset Timestamp { get; set; } + public int Id { get; } = ++Counter; +} +public class DataStreamDocument +{ + private static int Counter; + + public DateTimeOffset Timestamp { get; set; } + public int Id { get; } = ++Counter; +} + +public class TrackStrategy +{ + public int Id { get; init; } + public HeaderSerializationStrategy Strategy { get; init; } + public BulkHeader? Header { get; init; } + +} diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs b/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs index cef3ae6..bb34c52 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/SubPathTests.cs @@ -29,12 +29,18 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader ApiCallDetails callDetails = null; var wait = new ManualResetEvent(false); + Exception exception = null; var options = new IndexChannelOptions(Transport) { BufferOptions = new() { OutboundBufferMaxSize = 1 }, + ExportExceptionCallback = e => + { + exception = e; + wait.Set(); + }, ExportResponseCallback = (response, _) => { callDetails = response.ApiCallDetails; @@ -51,7 +57,9 @@ private void ExecuteAndAssert(string expectedUrl, string expectedOperationHeader using var channel = new IndexChannel(options); channel.TryWrite(new TestDocument()); - wait.WaitOne(); + var signalled = wait.WaitOne(TimeSpan.FromSeconds(5)); + signalled.Should().BeTrue("because ExportResponseCallback should have been called"); + exception.Should().BeNull(); callDetails.Should().NotBeNull(); callDetails.Uri.AbsolutePath.Should().Be(expectedUrl);