Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ public class BulkIngestionBenchmarks
[ParamsAllValues]
public bool DisableDiagnostics { get; set; }

[ParamsAllValues]
public bool UseReadOnlyMemory { get; set; }

[GlobalSetup]
public void Setup()
{
Expand All @@ -47,7 +44,6 @@ public void Setup()
OutboundBufferMaxSize = MaxExportSize
},
DisableDiagnostics = DisableDiagnostics,
UseReadOnlyMemory = UseReadOnlyMemory,
IndexFormat = "stock-data-v8",
OutboundChannelExitedCallback = () => _waitHandle.Set(),
#if DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class BulkRequestCreationForDataStreamBenchmarks
private ITransport? _transport;
private TransportConfiguration? _transportConfiguration;
private StockData[] _data = Array.Empty<StockData>();
private readonly BulkOperationHeader _bulkOperationHeader = new CreateOperation();

public Stream MemoryStream { get; } = new MemoryStream();

Expand All @@ -40,12 +39,12 @@ public void Setup()
_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader);
var postData = PostData.ReadOnlyMemory(bytes);
await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None);
}
//[Benchmark(Baseline = true)]
//public async Task WriteToStreamAsync()
//{
// MemoryStream.Position = 0;
// var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader);
// var postData = PostData.ReadOnlyMemory(bytes);
// await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None);
//}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public void Setup()
_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true));
var postData = PostData.ReadOnlyMemory(bytes);
await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None);
}
// [Benchmark(Baseline = true)]
// public async Task WriteToStreamAsync()
// {
// MemoryStream.Position = 0;
// var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true));
// var postData = PostData.ReadOnlyMemory(bytes);
// await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ public void Setup()
_data = StockData.CreateSampleData(DocumentsToIndex);
}

[Benchmark(Baseline = true)]
public async Task DynamicIndexName_WriteToStreamAsync()
{
MemoryStream.Position = 0;
var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false));
var postData = PostData.ReadOnlyMemory(bytes);
await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None);
}
// [Benchmark(Baseline = true)]
// public async Task DynamicIndexName_WriteToStreamAsync()
// {
// MemoryStream.Position = 0;
// var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false));
// var postData = PostData.ReadOnlyMemory(bytes);
// await postData.WriteAsync(MemoryStream, _transportConfiguration!, false, CancellationToken.None);
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks();
bm.Setup();
await bm.WriteToStreamAsync();
//await bm.WriteToStreamAsync();

var length = bm.MemoryStream.Length;

Expand Down
20 changes: 12 additions & 8 deletions src/Elastic.Channels/BufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,26 @@ public abstract class BufferedChannelBase<TChannelOptions, TEvent, TResponse>
internal InboundBuffer<TEvent> InboundBuffer { get; }

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options) : this(options, null) { }
protected BufferedChannelBase(TChannelOptions options, string diagnosticsName) : this(options, null, diagnosticsName) { }

/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>
protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners, string diagnosticsName)
{
TokenSource = options.CancellationToken.HasValue
? CancellationTokenSource.CreateLinkedTokenSource(options.CancellationToken.Value)
: new CancellationTokenSource();
Options = options;

var listeners = callbackListeners == null ? new[] { Options } : callbackListeners.Concat(new[] { Options }).ToArray();
var listeners = callbackListeners == null ? [Options] : callbackListeners.Concat([Options]).ToArray();
DiagnosticsListener = listeners
.Select(l => (l is IChannelDiagnosticsListener c) ? c : null)
.FirstOrDefault(e => e != null);
.OfType<IChannelDiagnosticsListener?>()
.FirstOrDefault();
if (DiagnosticsListener == null && !options.DisableDiagnostics)
{
// if no debug listener was already provided but was requested explicitly create one.
var l = new ChannelDiagnosticsListener<TEvent, TResponse>(GetType().Name);
var l = new ChannelDiagnosticsListener<TEvent, TResponse>(diagnosticsName);
DiagnosticsListener = l;
listeners = listeners.Concat(new[] { l }).ToArray();
listeners = listeners.Concat([l]).ToArray();
}
_callbacks = new ChannelCallbackInvoker<TEvent, TResponse>(listeners);

Expand Down Expand Up @@ -311,7 +311,11 @@ private async Task ConsumeOutboundEventsAsync()
}
}
await Task.WhenAll(_taskList).ConfigureAwait(false);
#if NET8_0_OR_GREATER
await _exitCancelSource.CancelAsync().ConfigureAwait(false);
#else
_exitCancelSource.Cancel();
#endif
_callbacks.OutboundChannelExitedCallback?.Invoke();
}

Expand Down Expand Up @@ -443,7 +447,7 @@ async Task<bool> AsyncSlowPathAsync(IOutboundBuffer<TEvent> b)
}

/// <inheritdoc cref="object.ToString"/>>
public override string ToString()
public override string? ToString()
{
if (DiagnosticsListener == null) return base.ToString();
var sb = new StringBuilder();
Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Channels/Diagnostics/NoopBufferedChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public class NoopChannelOptions : ChannelOptionsBase<NoopEvent, NoopResponse>
public NoopBufferedChannel(
NoopChannelOptions options,
ICollection<IChannelCallbacks<NoopEvent, NoopResponse>>? channelListeners = null
) : base(options, channelListeners) { }
) : base(options, channelListeners, nameof(NoopBufferedChannel)) { }

/// <inheritdoc cref="NoopBufferedChannel"/>
public NoopBufferedChannel(
BufferOptions options,
ICollection<IChannelCallbacks<NoopEvent, NoopResponse>>? channelListeners = null,
bool observeConcurrency = false
) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners)
) : base(new NoopChannelOptions { BufferOptions = options, TrackConcurrency = observeConcurrency }, channelListeners, nameof(NoopBufferedChannel))
{

}
Expand Down
2 changes: 1 addition & 1 deletion src/Elastic.Channels/Elastic.Channels.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<Description>Provides components to build a buffer-backed channel that flushes batches of data in a controlled (Max N || Max Duration) manner.</Description>
<PackageTags>elastic, channels, buffer</PackageTags>
<LangVersion>latest</LangVersion>
Expand Down
8 changes: 4 additions & 4 deletions src/Elastic.Channels/ResponseItemsBufferedChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public abstract class ResponseItemsBufferedChannelBase<TChannelOptions, TEvent,
where TEvent : class
{
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}"/>
protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners)
: base(options, callbackListeners) { }
protected ResponseItemsBufferedChannelBase(TChannelOptions options, ICollection<IChannelCallbacks<TEvent, TResponse>>? callbackListeners, string diagnosticsName)
: base(options, callbackListeners, diagnosticsName) { }

/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}"/>
protected ResponseItemsBufferedChannelBase(TChannelOptions options)
: base(options) { }
protected ResponseItemsBufferedChannelBase(TChannelOptions options, string diagnosticsName)
: base(options, diagnosticsName) { }

/// <summary> Based on <typeparamref name="TResponse"/> should return a bool indicating if retry is needed</summary>
protected abstract bool Retry(TResponse response);
Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Ingest.Apm/ApmChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ namespace Elastic.Ingest.Apm;
/// </summary>
public class ApmChannel : TransportChannelBase<ApmChannelOptions, IIntakeObject, EventIntakeResponse, IntakeErrorItem>
{
/// <inheritdoc cref="ApmChannel"/>
public ApmChannel(ApmChannelOptions options) : base(options) { }
/// <inheritdoc cref="ApmChannel"/>
public ApmChannel(ApmChannelOptions options) : base(options, nameof(ApmChannel)) { }

//retry if APM server returns 429
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Retry"/>
Expand Down
22 changes: 16 additions & 6 deletions src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,34 @@ namespace Elastic.Ingest.Elasticsearch.DataStreams;
public class DataStreamChannel<TEvent> : ElasticsearchChannelBase<TEvent, DataStreamChannelOptions<TEvent>>
where TEvent : class
{
private readonly CreateOperation _fixedHeader;
private readonly string _url;

/// <inheritdoc cref="DataStreamChannel{TEvent}"/>
public DataStreamChannel(DataStreamChannelOptions<TEvent> options) : this(options, null) { }

/// <inheritdoc cref="DataStreamChannel{TEvent}"/>
public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners) : base(options, callbackListeners)
public DataStreamChannel(DataStreamChannelOptions<TEvent> options, ICollection<IChannelCallbacks<TEvent, BulkResponse>>? callbackListeners, string? diagnosticsName = null)
: base(options, callbackListeners, diagnosticsName ?? nameof(DataStreamChannel<TEvent>))
{
var dataStream = Options.DataStream.ToString();

_url = $"{dataStream}/{base.BulkPathAndQuery}";

_fixedHeader = new CreateOperation();
}

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.CreateBulkOperationHeader"/>
protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event) => _fixedHeader;
/// <inheritdoc cref="EventIndexStrategy"/>
protected override (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event)
{
var listExecutedPipelines = Options.ListExecutedPipelines?.Invoke(@event);
var templates = Options.DynamicTemplateLookup?.Invoke(@event);
if (templates is null && listExecutedPipelines is null or false)
return (HeaderSerializationStrategy.CreateNoParams, null);
var header = new BulkHeader
{
DynamicTemplates = templates,
ListExecutedPipelines = listExecutedPipelines
};
return (HeaderSerializationStrategy.Create, header);
}

/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.TemplateName"/>
protected override string TemplateName => Options.DataStream.GetTemplateName();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<TargetFrameworks>netstandard2.0;netstandard2.1;net8.0</TargetFrameworks>
<Description>Offers an easy to use ChannelWriter implementation to push data concurrently to Elasticsearch using Elastic.Transport</Description>
<PackageTags>elastic, channels, elasticsearch, ingest</PackageTags>
<LangVersion>latest</LangVersion>
<Nullable>enable</Nullable>
<IsPackable>True</IsPackable>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
Expand Down
90 changes: 90 additions & 0 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Indices;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Ingest.Transport;
using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics;

namespace Elastic.Ingest.Elasticsearch;

/// <summary>
/// An abstract base class for both <see cref="DataStreamChannel{TEvent}"/> and <see cref="IndexChannel{TEvent}"/>
/// <para>Coordinates most of the sending to- and bootstrapping of Elasticsearch</para>
/// </summary>
public abstract partial class ElasticsearchChannelBase<TEvent, TChannelOptions>
where TChannelOptions : ElasticsearchChannelOptionsBase<TEvent>
where TEvent : class
{
/// <summary> TODO </summary>
protected abstract (HeaderSerializationStrategy, BulkHeader?) EventIndexStrategy(TEvent @event);

/// <summary>
/// Asynchronously write the NDJSON request body for a page of <typeparamref name="TEvent"/> events to <see cref="Stream"/>.
/// </summary>
/// <param name="page">A page of <typeparamref name="TEvent"/> events.</param>
/// <param name="stream">The target <see cref="Stream"/> for the request.</param>
/// <param name="options">The <see cref="ElasticsearchChannelOptionsBase{TEvent}"/> for the channel where the request will be written.</param>
/// <param name="ctx">The cancellation token to cancel operation.</param>
/// <returns></returns>
public async Task WriteBufferToStreamAsync(
ArraySegment<TEvent> page, Stream stream, ElasticsearchChannelOptionsBase<TEvent> options, CancellationToken ctx = default)
{
#if NETSTANDARD2_1_OR_GREATER
var items = page;
#else
// needs cast prior to netstandard2.0
IReadOnlyList<TEvent> items = page;
#endif
// for is okay on ArraySegment, foreach performs bad:
// https://antao-almada.medium.com/how-to-use-span-t-and-memory-t-c0b126aae652
// ReSharper disable once ForCanBeConvertedToForeach
for (var i = 0; i < items.Count; i++)
{
var @event = items[i];
if (@event == null) continue;

var (op, header) = EventIndexStrategy(@event);
switch (op)
{
case HeaderSerializationStrategy.IndexNoParams:
await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case HeaderSerializationStrategy.CreateNoParams:
await SerializePlainCreateHeaderAsync(stream, ctx).ConfigureAwait(false);
break;
case HeaderSerializationStrategy.Index:
case HeaderSerializationStrategy.Create:
case HeaderSerializationStrategy.Delete:
case HeaderSerializationStrategy.Update:
await SerializeHeaderAsync(stream, op, ref header, ctx).ConfigureAwait(false);
break;
}

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);

if (op == HeaderSerializationStrategy.Update)
await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false);

if (options.EventWriter?.WriteToStreamAsync != null)
await options.EventWriter.WriteToStreamAsync(stream, @event, ctx).ConfigureAwait(false);
else
await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx)
.ConfigureAwait(false);

if (op == HeaderSerializationStrategy.Update)
await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false);

await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false);
}
}
}
Loading