11using System ;
22using System . Collections . Generic ;
33using System . Linq ;
4+ using Elastic . Channels ;
45using Elastic . Channels . Buffers ;
56using Elastic . Channels . Diagnostics ;
67using Elastic . CommonSchema ;
@@ -44,7 +45,7 @@ public interface IElasticsearchSinkOptions
4445 /// <summary>
4546 /// Provides configuration options to <see cref="ElasticsearchSink"/> to control how and where data gets written
4647 /// </summary>
47- public class ElasticsearchSinkOptions : ElasticsearchSinkOptions < EcsDocument >
48+ public class ElasticsearchSinkOptions : ElasticsearchSinkOptions < LogEventEcsDocument >
4849 {
4950 /// <inheritdoc cref="ElasticsearchSinkOptions"/>
5051 public ElasticsearchSinkOptions ( ) { }
@@ -56,7 +57,7 @@ public ElasticsearchSinkOptions(ITransport transport) : base(transport) { }
5657 /// <inheritdoc cref="ElasticsearchSinkOptions{TEcsDocument}"/>
5758 public class ElasticsearchSinkOptions < TEcsDocument >
5859 : IElasticsearchSinkOptions
59- where TEcsDocument : EcsDocument , new ( )
60+ where TEcsDocument : LogEventEcsDocument , new ( )
6061 {
6162 /// <inheritdoc cref="ElasticsearchSinkOptions"/>
6263 public ElasticsearchSinkOptions ( ) : this ( new DistributedTransport ( TransportHelper . Default ( ) ) ) { }
@@ -110,18 +111,19 @@ public ElasticsearchSinkOptions() : this(new DistributedTransport(TransportHelpe
110111 /// <summary>
111112 /// This sink allows you to write serilog logs directly to Elasticsearch or Elastic Cloud
112113 /// </summary>
113- public class ElasticsearchSink : ElasticsearchSink < EcsDocument >
114+ public class ElasticsearchSink : ElasticsearchSink < LogEventEcsDocument >
114115 {
115116 /// <inheritdoc cref="ElasticsearchSink"/>>
116117 public ElasticsearchSink ( ElasticsearchSinkOptions options ) : base ( options ) { }
117118 }
118119
119120 /// <inheritdoc cref="ElasticsearchSink"/>>
120- public class ElasticsearchSink < TEcsDocument > : ILogEventSink , IDisposable
121- where TEcsDocument : EcsDocument , new ( )
121+ public class ElasticsearchSink < TEcsDocument > : ILogEventSink , IDisposable , ISetLoggingFailureListener
122+ where TEcsDocument : LogEventEcsDocument , new ( )
122123 {
123124 private readonly EcsTextFormatterConfiguration < TEcsDocument > _formatterConfiguration ;
124125 private readonly EcsDataStreamChannel < TEcsDocument > _channel ;
126+ private ILoggingFailureListener _failureListener = SelfLog . FailureListener ;
125127
126128 /// <inheritdoc cref="IElasticsearchSinkOptions"/>
127129 public IElasticsearchSinkOptions Options { get ; }
@@ -133,7 +135,9 @@ public ElasticsearchSink(ElasticsearchSinkOptions<TEcsDocument> options)
133135 _formatterConfiguration = options . TextFormatting ;
134136 var channelOptions = new DataStreamChannelOptions < TEcsDocument > ( options . Transport )
135137 {
136- DataStream = options . DataStream
138+ DataStream = options . DataStream ,
139+ ExportMaxRetriesCallback = EmitExportFailures
140+
137141 } ;
138142 options . ConfigureChannel ? . Invoke ( channelOptions ) ;
139143 _channel = new EcsDataStreamChannel < TEcsDocument > ( channelOptions , new [ ] { new SelfLogCallbackListener < TEcsDocument > ( options ) } ) ;
@@ -142,18 +146,46 @@ public ElasticsearchSink(ElasticsearchSinkOptions<TEcsDocument> options)
142146 _channel . BootstrapElasticsearch ( options . BootstrapMethod , options . IlmPolicy ) ;
143147 }
144148
149+ private void EmitExportFailures ( IReadOnlyCollection < TEcsDocument > documents )
150+ {
151+ var logs = documents
152+ . Select ( d => d . LogEvent )
153+ . ToArray ( ) ;
154+ _failureListener . OnLoggingFailed (
155+ this ,
156+ LoggingFailureKind . Temporary ,
157+ "Failure to export events over to Elasticsearch." ,
158+ logs ,
159+ exception : null
160+ ) ;
161+ }
162+
145163 /// <inheritdoc cref="ILogEventSink.Emit"/>
146164 public void Emit ( LogEvent logEvent )
147165 {
148166 var ecsDoc = LogEventConverter . ConvertToEcs ( logEvent , _formatterConfiguration ) ;
149- _channel . TryWrite ( ecsDoc ) ;
167+ ecsDoc . LogEvent = logEvent ;
168+ if ( ! _channel . TryWrite ( ecsDoc ) )
169+ {
170+ _failureListener . OnLoggingFailed (
171+ this ,
172+ LoggingFailureKind . Temporary ,
173+ "Failure to push event over the channel." ,
174+ [ logEvent ] ,
175+ exception : null
176+ ) ;
177+ }
150178 }
151179
152180 /// <summary> Disposes and flushed <see cref="EcsDataStreamChannel{TEcsDocument}"/> </summary>
153181 public void Dispose ( ) => _channel . Dispose ( ) ;
182+
183+ void ISetLoggingFailureListener . SetFailureListener ( ILoggingFailureListener failureListener ) =>
184+ _failureListener = failureListener ?? throw new ArgumentNullException ( nameof ( failureListener ) ) ;
154185 }
155186
156- internal class SelfLogCallbackListener < TEcsDocument > : IChannelCallbacks < TEcsDocument , BulkResponse > where TEcsDocument : EcsDocument , new ( )
187+ internal class SelfLogCallbackListener < TEcsDocument > : IChannelCallbacks < TEcsDocument , BulkResponse >
188+ where TEcsDocument : LogEventEcsDocument , new ( )
157189 {
158190 public Action < Exception > ? ExportExceptionCallback { get ; }
159191 public Action < BulkResponse , IWriteTrackingBuffer > ? ExportResponseCallback { get ; }
0 commit comments