2020namespace Elastic . Transport ;
2121
2222/// <inheritdoc cref="ITransport{TConfiguration}" />
23- public sealed class DistributedTransport : DistributedTransport < ITransportConfiguration >
23+ /// <summary>
24+ /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on
25+ /// different nodes
26+ /// </summary>
27+ /// <param name="configuration">The configuration to use for this transport</param>
28+ public sealed class DistributedTransport ( ITransportConfiguration configuration ) : DistributedTransport < ITransportConfiguration > ( configuration )
2429{
25- /// <summary>
26- /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on
27- /// different nodes
28- /// </summary>
29- /// <param name="configuration">The configuration to use for this transport</param>
30- public DistributedTransport ( ITransportConfiguration configuration )
31- : base ( configuration ) { }
3230}
3331
3432/// <inheritdoc cref="ITransport{TConfiguration}" />
@@ -65,36 +63,36 @@ public DistributedTransport(TConfiguration configuration)
6563 public TResponse Request < TResponse > (
6664 in EndpointPath path ,
6765 PostData ? data ,
68- in OpenTelemetryData openTelemetryData ,
66+ Action < Activity > ? configureActivity ,
6967 IRequestConfiguration ? localConfiguration
7068 ) where TResponse : TransportResponse , new ( ) =>
71- RequestCoreAsync < TResponse > ( isAsync : false , path , data , openTelemetryData , localConfiguration )
69+ RequestCoreAsync < TResponse > ( isAsync : false , path , data , configureActivity , localConfiguration )
7270 . EnsureCompleted ( ) ;
7371
7472 /// <inheritdoc cref="ITransport.RequestAsync{TResponse}"/>
7573 public Task < TResponse > RequestAsync < TResponse > (
7674 in EndpointPath path ,
7775 PostData ? data ,
78- in OpenTelemetryData openTelemetryData ,
76+ Action < Activity > ? configureActivity ,
7977 IRequestConfiguration ? localConfiguration ,
8078 CancellationToken cancellationToken = default
8179 ) where TResponse : TransportResponse , new ( ) =>
82- RequestCoreAsync < TResponse > ( isAsync : true , path , data , openTelemetryData , localConfiguration , cancellationToken )
80+ RequestCoreAsync < TResponse > ( isAsync : true , path , data , configureActivity , localConfiguration , cancellationToken )
8381 . AsTask ( ) ;
8482
8583 private async ValueTask < TResponse > RequestCoreAsync < TResponse > (
8684 bool isAsync ,
8785 EndpointPath path ,
8886 PostData ? data ,
89- OpenTelemetryData openTelemetryData ,
87+ Action < Activity > ? configureActivity ,
9088 IRequestConfiguration ? localConfiguration ,
9189 CancellationToken cancellationToken = default
9290 ) where TResponse : TransportResponse , new ( )
9391 {
9492 Activity activity = null ;
9593
9694 if ( OpenTelemetry . ElasticTransportActivitySource . HasListeners ( ) )
97- activity = OpenTelemetry . ElasticTransportActivitySource . StartActivity ( openTelemetryData . SpanName ?? path . Method . GetStringValue ( ) ,
95+ activity = OpenTelemetry . ElasticTransportActivitySource . StartActivity ( path . Method . GetStringValue ( ) ,
9896 ActivityKind . Client ) ;
9997
10098 try
@@ -127,7 +125,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
127125 if ( activity is { IsAllDataRequested : true } )
128126 {
129127 if ( activity . IsAllDataRequested )
130- OpenTelemetry . SetCommonAttributes ( activity , openTelemetryData , Configuration ) ;
128+ OpenTelemetry . SetCommonAttributes ( activity , Configuration ) ;
131129
132130 if ( Configuration . Authentication is BasicAuthentication basicAuthentication )
133131 activity . SetTag ( SemanticConventions . DbUser , basicAuthentication . Username ) ;
@@ -136,11 +134,6 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
136134 activity . SetTag ( OpenTelemetryAttributes . ElasticTransportProductVersion , Configuration . ProductRegistration . ProductAssemblyVersion ) ;
137135 activity . SetTag ( OpenTelemetryAttributes . ElasticTransportVersion , ReflectionVersionInfo . TransportVersion ) ;
138136 activity . SetTag ( SemanticConventions . UserAgentOriginal , Configuration . UserAgent . ToString ( ) ) ;
139-
140- if ( openTelemetryData . SpanAttributes is not null )
141- foreach ( var attribute in openTelemetryData . SpanAttributes )
142- activity . SetTag ( attribute . Key , attribute . Value ) ;
143-
144137 activity . SetTag ( SemanticConventions . HttpRequestMethod , endpoint . Method . GetStringValue ( ) ) ;
145138 }
146139
@@ -268,6 +261,9 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
268261 activity ? . SetTag ( SemanticConventions . HttpResponseStatusCode , response . ApiCallDetails . HttpStatusCode ) ;
269262 activity ? . SetTag ( OpenTelemetryAttributes . ElasticTransportAttemptedNodes , attemptedNodes ) ;
270263
264+ if ( configureActivity is not null && activity is not null )
265+ configureActivity . Invoke ( activity ) ;
266+
271267 return FinalizeResponse ( endpoint , boundConfiguration , data , pipeline , startedOn , auditor , seenExceptions , response ) ;
272268 }
273269 finally
0 commit comments