Skip to content

Commit 4085a8b

Browse files
authored
Small refactorings before releasing 0.3.0 (#19)
1 parent 1104303 commit 4085a8b

File tree

22 files changed

+161
-150
lines changed

22 files changed

+161
-150
lines changed

examples/Elastic.Channels.Example/Drain.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ public static class Drain
4747
var bufferOptions = new BufferOptions
4848
{
4949
WaitHandle = new CountdownEvent(expectedSentBuffers),
50-
MaxInFlightMessages = maxInFlight,
51-
MaxConsumerBufferSize = bufferSize,
52-
ConcurrentConsumers = concurrency,
53-
MaxConsumerBufferLifetime = TimeSpan.FromSeconds(20)
50+
InboundBufferMaxSize = maxInFlight,
51+
OutboundBufferMaxSize = bufferSize,
52+
ExportMaxConcurrency = concurrency,
53+
OutboundBufferMaxLifetime = TimeSpan.FromSeconds(20)
5454

5555
};
5656
var channel = new DiagnosticsBufferedChannel(bufferOptions, observeConcurrency: false);

examples/Elastic.Channels.Example/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
Console.WriteLine(channel);
4444
Console.WriteLine();
4545

46-
Console.WriteLine($"Written buffers: {channel.SentBuffersCount:N0}");
46+
Console.WriteLine($"Written buffers: {channel.ExportedBuffers:N0}");
4747
Console.WriteLine($"Written events: {writtenElastic:N0} events");
4848
Console.WriteLine($"ObservedConcurrency: {channel.ObservedConcurrency:N0}");
4949
Console.WriteLine($"Duration: {sw.Elapsed:g}");

examples/Elastic.Ingest.Apm.Example/Program.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ private static int Main(string[] args)
4141
var options =
4242
new BufferOptions
4343
{
44-
ConcurrentConsumers = 1,
45-
MaxConsumerBufferSize = 200,
46-
MaxConsumerBufferLifetime = TimeSpan.FromSeconds(10),
44+
ExportMaxConcurrency = 1,
45+
OutboundBufferMaxSize = 200,
46+
OutboundBufferMaxLifetime = TimeSpan.FromSeconds(10),
4747
WaitHandle = handle,
48-
MaxRetries = 3,
49-
BackoffPeriod = times => TimeSpan.FromMilliseconds(1),
50-
BufferExportedCallback = () => Console.WriteLine("Flushed"),
48+
ExportMaxRetries = 3,
49+
ExportBackoffPeriod = times => TimeSpan.FromMilliseconds(1),
50+
ExportBufferCallback = () => Console.WriteLine("Flushed"),
5151
};
5252
var channelOptions = new ApmChannelOptions(transport)
5353
{
@@ -61,7 +61,7 @@ private static int Main(string[] args)
6161
},
6262
ExportMaxRetriesCallback = (list) => Interlocked.Increment(ref _maxRetriesExceeded),
6363
ExportRetryCallback = (list) => Interlocked.Increment(ref _retries),
64-
ExceptionCallback = (e) => _exception = e
64+
ExportExceptionCallback = (e) => _exception = e
6565
};
6666
var channel = new ApmChannel(channelOptions);
6767

src/Elastic.Channels/BufferOptions.cs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,44 +11,51 @@ public class BufferOptions
1111
{
1212
/// <summary>
1313
/// The maximum number of in flight instances that can be queued in memory. If this threshold is reached, events will be dropped
14+
/// <para>Defaults to <c>100_000</c></para>
1415
/// </summary>
15-
public int MaxInFlightMessages { get; set; } = 100_000;
16+
public int InboundBufferMaxSize { get; set; } = 100_000;
1617

1718
/// <summary>
18-
/// The number of events a local buffer should reach before sending the events in a single call to Elasticsearch.
19+
/// The maximum size to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> at once.
20+
/// <para>Defaults to <c>1_000</c></para>
1921
/// </summary>
20-
public int MaxConsumerBufferSize { get; set; } = 1_000;
22+
public int OutboundBufferMaxSize { get; set; } = 1_000;
2123

2224
/// <summary>
23-
/// The maximum number of times that an item that returns with a retryable status code is retried to be stored in Elasticsearch.
24-
/// <see cref="BackoffPeriod"/> to implement a backoff period of your choosing. MaxRetries default to 3.
25+
/// The maximum lifetime of a buffer to export to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/>.
26+
/// If a buffer is older then the configured <see cref="OutboundBufferMaxLifetime"/> it will be flushed to
27+
/// <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> regardless of it's current size
28+
/// <para>Defaults to <c>5 seconds</c></para>
2529
/// </summary>
26-
public int MaxRetries { get; set; } = 3;
30+
public TimeSpan OutboundBufferMaxLifetime { get; set; } = TimeSpan.FromSeconds(5);
2731

2832
/// <summary>
29-
/// A consumer builds up a local buffer until <see cref="MaxConsumerBufferSize"/> is reached. If events come in too slow, these
30-
/// events could end up taking forever to be sent to Elasticsearch. This controls how long a buffer may exist before a flush is triggered.
33+
/// The maximum number of consumers allowed to poll for new events on the channel.
34+
/// <para>Defaults to <c>1</c>, increase to introduce concurrency.</para>
3135
/// </summary>
32-
public TimeSpan MaxConsumerBufferLifetime { get; set; } = TimeSpan.FromSeconds(5);
36+
public int ExportMaxConcurrency { get; set; } = 1;
3337

3438
/// <summary>
35-
/// The maximum number of consumers allowed to poll for new events on the channel. Defaults to 1, increase to introduce concurrency.
39+
/// The times to retry an export if <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.RetryBuffer"/> yields items to retry.
40+
/// <para>Whether or not items are selected for retrying depends on the actual channel implementation</para>
41+
/// <see cref="ExportBackoffPeriod"/> to implement a backoff period of your choosing.
42+
/// <para>Defaults to <c>3</c>, when <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.RetryBuffer"/> yields any items</para>
3643
/// </summary>
37-
public int ConcurrentConsumers { get; set; } = 1;
44+
public int ExportMaxRetries { get; set; } = 3;
3845

3946

4047
/// <summary>
4148
/// A function to calculate the backoff period, gets passed the number of retries attempted starting at 0.
4249
/// By default backs off in increments of 2 seconds.
4350
/// </summary>
44-
public Func<int, TimeSpan> BackoffPeriod { get; set; } = (i) => TimeSpan.FromSeconds(2 * (i + 1));
51+
public Func<int, TimeSpan> ExportBackoffPeriod { get; set; } = (i) => TimeSpan.FromSeconds(2 * (i + 1));
4552

4653
/// <summary>
4754
/// Called once after a buffer has been flushed, if the buffer is retried this callback is only called once
48-
/// all retries have been exhausted. Its called regardless of whether the call to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Send"/>
55+
/// all retries have been exhausted. Its called regardless of whether the call to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/>
4956
/// succeeded.
5057
/// </summary>
51-
public Action? BufferExportedCallback { get; set; }
58+
public Action? ExportBufferCallback { get; set; }
5259

5360
/// <summary>
5461
/// Allows you to inject a <see cref="CountdownEvent"/> to wait for N number of buffers to flush.

src/Elastic.Channels/BufferedChannelBase.cs

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Threading;
88
using System.Threading.Channels;
99
using System.Threading.Tasks;
10+
using Elastic.Channels.Buffers;
1011

1112
namespace Elastic.Channels;
1213

@@ -48,10 +49,10 @@ protected BufferedChannelBase(TChannelOptions options)
4849
{
4950
TokenSource = new CancellationTokenSource();
5051
Options = options;
51-
var maxConsumers = Math.Max(1, BufferOptions.ConcurrentConsumers);
52+
var maxConsumers = Math.Max(1, BufferOptions.ExportMaxConcurrency);
5253
_throttleTasks = new SemaphoreSlim(maxConsumers, maxConsumers);
5354
_signal = options.BufferOptions.WaitHandle;
54-
var maxIn = Math.Max(1, BufferOptions.MaxInFlightMessages);
55+
var maxIn = Math.Max(1, BufferOptions.InboundBufferMaxSize);
5556
InChannel = Channel.CreateBounded<TEvent>(new BoundedChannelOptions(maxIn)
5657
{
5758
SingleReader = false,
@@ -63,8 +64,8 @@ protected BufferedChannelBase(TChannelOptions options)
6364
// DropWrite will make `TryWrite` always return true, which is not what we want.
6465
FullMode = BoundedChannelFullMode.Wait
6566
});
66-
// The minimum out buffer the max of (1 or MaxConsumerBufferSize) as long as it does not exceed MaxInFlightMessages
67-
var maxOut = Math.Min(BufferOptions.MaxInFlightMessages, Math.Max(1, BufferOptions.MaxConsumerBufferSize));
67+
// The minimum out buffer the max of (1 or OutboundBufferMaxSize) as long as it does not exceed InboundBufferMaxSize
68+
var maxOut = Math.Min(BufferOptions.InboundBufferMaxSize, Math.Max(1, BufferOptions.OutboundBufferMaxSize));
6869
OutChannel = Channel.CreateBounded<IOutboundBuffer<TEvent>>(
6970
new BoundedChannelOptions(maxOut)
7071
{
@@ -78,12 +79,12 @@ protected BufferedChannelBase(TChannelOptions options)
7879
FullMode = BoundedChannelFullMode.Wait
7980
});
8081

81-
InboundBuffer = new InboundBuffer<TEvent>(maxOut, BufferOptions.MaxConsumerBufferLifetime);
82+
InboundBuffer = new InboundBuffer<TEvent>(maxOut, BufferOptions.OutboundBufferMaxLifetime);
8283

8384
_outThread = Task.Factory.StartNew(async () => await ConsumeOutboundEvents().ConfigureAwait(false),
8485
TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness);
8586
_inThread = Task.Factory.StartNew(async () =>
86-
await ConsumeInboundEvents(maxOut, BufferOptions.MaxConsumerBufferLifetime)
87+
await ConsumeInboundEvents(maxOut, BufferOptions.OutboundBufferMaxLifetime)
8788
.ConfigureAwait(false)
8889
, TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness
8990
);
@@ -105,9 +106,12 @@ await ConsumeInboundEvents(maxOut, BufferOptions.MaxConsumerBufferLifetime)
105106

106107
public override bool TryWrite(TEvent item)
107108
{
108-
if (InChannel.Writer.TryWrite(item)) return true;
109-
110-
Options.PublishRejectionCallback?.Invoke(item);
109+
if (InChannel.Writer.TryWrite(item))
110+
{
111+
Options.PublishToInboundChannelCallback?.Invoke();
112+
return true;
113+
}
114+
Options.PublishToInboundChannelFailureCallback?.Invoke();
111115
return false;
112116
}
113117

@@ -117,16 +121,14 @@ public virtual async Task<bool> WaitToWriteAsync(TEvent item, CancellationToken
117121
if (await InChannel.Writer.WaitToWriteAsync(ctx).ConfigureAwait(false) &&
118122
InChannel.Writer.TryWrite(item))
119123
{
120-
Options.PublishToInboundChannel?.Invoke();
124+
Options.PublishToInboundChannelCallback?.Invoke();
121125
return true;
122126
}
123-
Options.PublishToInboundChannelFailure?.Invoke();
124-
125-
Options.PublishRejectionCallback?.Invoke(item);
127+
Options.PublishToInboundChannelFailureCallback?.Invoke();
126128
return false;
127129
}
128130

129-
protected abstract Task<TResponse> Send(IReadOnlyCollection<TEvent> buffer, CancellationToken ctx = default);
131+
protected abstract Task<TResponse> Export(IReadOnlyCollection<TEvent> buffer, CancellationToken ctx = default);
130132

131133
private static readonly IReadOnlyCollection<TEvent> DefaultRetryBuffer = new TEvent[] { };
132134

@@ -138,9 +140,9 @@ IWriteTrackingBuffer statistics
138140

139141
private async Task ConsumeOutboundEvents()
140142
{
141-
Options.OutboundChannelStarted?.Invoke();
143+
Options.OutboundChannelStartedCallback?.Invoke();
142144

143-
var maxConsumers = Options.BufferOptions.ConcurrentConsumers;
145+
var maxConsumers = Options.BufferOptions.ExportMaxConcurrency;
144146
var taskList = new List<Task>(maxConsumers);
145147

146148
while (await OutChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
@@ -165,12 +167,12 @@ private async Task ConsumeOutboundEvents()
165167
}
166168
}
167169
await Task.WhenAll(taskList).ConfigureAwait(false);
168-
Options.OutboundChannelExited?.Invoke();
170+
Options.OutboundChannelExitedCallback?.Invoke();
169171
}
170172

171173
private async Task ExportBuffer(IReadOnlyCollection<TEvent> items, IWriteTrackingBuffer buffer)
172174
{
173-
var maxRetries = Options.BufferOptions.MaxRetries;
175+
var maxRetries = Options.BufferOptions.ExportMaxRetries;
174176
for (var i = 0; i <= maxRetries && items.Count > 0; i++)
175177
{
176178
if (TokenSource.Token.IsCancellationRequested) break;
@@ -180,12 +182,12 @@ private async Task ExportBuffer(IReadOnlyCollection<TEvent> items, IWriteTrackin
180182
TResponse? response;
181183
try
182184
{
183-
response = await Send(items, TokenSource.Token).ConfigureAwait(false);
185+
response = await Export(items, TokenSource.Token).ConfigureAwait(false);
184186
Options.ExportResponseCallback?.Invoke(response, buffer);
185187
}
186188
catch (Exception e)
187189
{
188-
Options.ExceptionCallback?.Invoke(e);
190+
Options.ExportExceptionCallback?.Invoke(e);
189191
break;
190192
}
191193

@@ -195,21 +197,21 @@ private async Task ExportBuffer(IReadOnlyCollection<TEvent> items, IWriteTrackin
195197
var atEndOfRetries = i == maxRetries;
196198
if (items.Count > 0 && !atEndOfRetries)
197199
{
198-
await Task.Delay(Options.BufferOptions.BackoffPeriod(i), TokenSource.Token).ConfigureAwait(false);
200+
await Task.Delay(Options.BufferOptions.ExportBackoffPeriod(i), TokenSource.Token).ConfigureAwait(false);
199201
Options.ExportRetryCallback?.Invoke(items);
200202
}
201203
// otherwise if retryable items still exist and the user wants to be notified notify the user
202204
else if (items.Count > 0 && atEndOfRetries)
203205
Options.ExportMaxRetriesCallback?.Invoke(items);
204206
}
205-
Options.BufferOptions.BufferExportedCallback?.Invoke();
207+
Options.BufferOptions.ExportBufferCallback?.Invoke();
206208
if (_signal is { IsSet: false })
207209
_signal.Signal();
208210
}
209211

210212
private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInterval)
211213
{
212-
Options.InboundChannelStarted?.Invoke();
214+
Options.InboundChannelStartedCallback?.Invoke();
213215
while (await InboundBuffer.WaitToReadAsync(InChannel.Reader).ConfigureAwait(false))
214216
{
215217
if (TokenSource.Token.IsCancellationRequested) break;
@@ -223,7 +225,7 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter
223225
break;
224226
}
225227

226-
Options.PublishToOutboundChannel?.Invoke();
228+
Options.PublishToOutboundChannelCallback?.Invoke();
227229
if (InboundBuffer.NoThresholdsHit) continue;
228230

229231
//:w
@@ -234,24 +236,21 @@ private async Task ConsumeInboundEvents(int maxQueuedMessages, TimeSpan maxInter
234236

235237
if (await PublishAsync(outboundBuffer).ConfigureAwait(false))
236238
continue;
237-
238-
foreach (var e in InboundBuffer.Buffer)
239-
Options.PublishRejectionCallback?.Invoke(e);
239+
Options.PublishToOutboundChannelFailureCallback?.Invoke();
240240
}
241241
}
242242

243243
private ValueTask<bool> PublishAsync(IOutboundBuffer<TEvent> buffer)
244244
{
245245
async Task<bool> AsyncSlowPath(IOutboundBuffer<TEvent> b)
246246
{
247-
var maxRetries = Options.BufferOptions.MaxRetries;
247+
var maxRetries = Options.BufferOptions.ExportMaxRetries;
248248
for (var i = 0; i <= maxRetries; i++)
249249
while (await OutChannel.Writer.WaitToWriteAsync().ConfigureAwait(false))
250250
{
251251
if (OutChannel.Writer.TryWrite(b))
252252
return true;
253253
}
254-
Options.PublishToOutboundChannelFailure?.Invoke();
255254
return false;
256255
}
257256

src/Elastic.Channels/IWriteTrackingBuffer.cs renamed to src/Elastic.Channels/Buffers/IWriteTrackingBuffer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
using System;
66

7-
namespace Elastic.Channels;
7+
namespace Elastic.Channels.Buffers;
88

99
/// <summary>
1010
/// Represents a buffer that tracks the <see cref="DurationSinceFirstWrite"/> and it's current <see cref="Count"/>

src/Elastic.Channels/InboundBuffer.cs renamed to src/Elastic.Channels/Buffers/InboundBuffer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
using System.Threading.Channels;
99
using System.Threading.Tasks;
1010

11-
namespace Elastic.Channels;
11+
namespace Elastic.Channels.Buffers;
1212

1313
/// <summary>
1414
/// <see cref="InboundBuffer{TEvent}"/> is a buffer that will block <see cref="WaitToReadAsync"/> until

src/Elastic.Channels/OutboundBuffer.cs renamed to src/Elastic.Channels/Buffers/OutboundBuffer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
using System;
66
using System.Collections.Generic;
77

8-
namespace Elastic.Channels;
8+
namespace Elastic.Channels.Buffers;
99

1010
public interface IOutboundBuffer<out TEvent> : IWriteTrackingBuffer
1111
{

src/Elastic.Channels/ChannelOptionsBase.cs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.IO;
77
using System.Threading;
88
using System.Threading.Tasks;
9+
using Elastic.Channels.Buffers;
910

1011
namespace Elastic.Channels
1112
{
@@ -20,40 +21,40 @@ public abstract class ChannelOptionsBase<TEvent, TResponse>
2021

2122
public Func<Stream, CancellationToken, TEvent, Task> WriteEvent { get; set; } = null!;
2223

23-
/// <summary>
24-
/// If <see cref="Elastic.Channels.BufferOptions.MaxInFlightMessages"/> is reached, <see cref="TEvent"/>'s will fail to be published to the channel. You can be notified of dropped
25-
/// events with this callback
26-
/// </summary>
27-
public Action<TEvent>? PublishRejectionCallback { get; set; }
28-
29-
public Action<Exception>? ExceptionCallback { get; set; }
24+
/// <summary> Called if the call to <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.Export"/> throws. </summary>
25+
public Action<Exception>? ExportExceptionCallback { get; set; }
3026

27+
/// <summary> Called with (number of retries) (number of items to be exported) </summary>
3128
public Action<int, int>? ExportItemsAttemptCallback { get; set; }
3229

33-
/// <summary> Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of <see cref="Elastic.Channels.BufferOptions.MaxRetries"/></summary>
30+
/// <summary> Subscribe to be notified of events that are retryable but did not store correctly withing the boundaries of <see cref="Channels.BufferOptions.ExportMaxRetries"/></summary>
3431
public Action<IReadOnlyCollection<TEvent>>? ExportMaxRetriesCallback { get; set; }
3532

36-
/// <summary> Subscribe to be notified of events that are retryable but did not store correctly within the number of configured <see cref="Elastic.Channels.BufferOptions.MaxRetries"/></summary>
33+
/// <summary> Subscribe to be notified of events that are retryable but did not store correctly within the number of configured <see cref="Channels.BufferOptions.ExportMaxRetries"/></summary>
3734
public Action<IReadOnlyCollection<TEvent>>? ExportRetryCallback { get; set; }
3835

3936
/// <summary> A generic hook to be notified of any bulk request being initiated by <see cref="InboundBuffer{TEvent}"/> </summary>
4037
public Action<TResponse, IWriteTrackingBuffer>? ExportResponseCallback { get; set; }
4138

4239
/// <summary>Called everytime an event is written to the inbound channel </summary>
43-
public Action? PublishToInboundChannel { get; set; }
40+
public Action? PublishToInboundChannelCallback { get; set; }
4441

4542
/// <summary>Called everytime an event is not written to the inbound channel </summary>
46-
public Action? PublishToInboundChannelFailure { get; set; }
43+
public Action? PublishToInboundChannelFailureCallback { get; set; }
4744

4845
/// <summary>Called everytime the inbound channel publishes to the outbound channel. </summary>
49-
public Action? PublishToOutboundChannel { get; set; }
46+
public Action? PublishToOutboundChannelCallback { get; set; }
47+
48+
/// <summary> Called when the thread to read the outbound channel is started </summary>
49+
public Action? OutboundChannelStartedCallback { get; set; }
50+
/// <summary> Called when the thread to read the outbound channel has exited</summary>
51+
public Action? OutboundChannelExitedCallback { get; set; }
5052

51-
public Action? OutboundChannelStarted { get; set; }
52-
public Action? OutboundChannelExited { get; set; }
53-
public Action? InboundChannelStarted { get; set; }
53+
/// <summary> Called when the thread to read the inbound channel has started</summary>
54+
public Action? InboundChannelStartedCallback { get; set; }
5455

5556
/// <summary>Called everytime the inbound channel fails to publish to the outbound channel. </summary>
56-
public Action? PublishToOutboundChannelFailure { get; set; }
57+
public Action? PublishToOutboundChannelFailureCallback { get; set; }
5758
}
5859

5960
}

0 commit comments

Comments
 (0)