diff --git a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs index d981df6..a3b73b0 100644 --- a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs +++ b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs @@ -64,7 +64,7 @@ private RequestConfiguration PingAndSniffRequestConfiguration } } - private bool DepletedRetries(DateTimeOffset startedOn) => Retried >= MaxRetries + 1 || IsTakingTooLong(startedOn); + private bool DepletedRetries(DateTimeOffset startedOn, int attemptedNodes) => attemptedNodes >= MaxRetries + 1 || IsTakingTooLong(startedOn); private bool FirstPoolUsageNeedsSniffing => !RequestDisabledSniff @@ -87,8 +87,6 @@ private bool IsTakingTooLong(DateTimeOffset startedOn) private bool Refresh { get; set; } - private int Retried { get; set; } - private IEnumerable SniffNodes(Auditor? auditor) => _nodePool .CreateView(auditor) .ToList() @@ -204,6 +202,7 @@ private async ValueTask CallProductEndpointCoreAsync( Endpoint endpoint, Auditor? auditor, DateTimeOffset startedOn, + int attemptedNodes, List? seenExceptions ) where TResponse : TransportResponse, new() @@ -231,7 +230,7 @@ private async ValueTask CallProductEndpointCoreAsync( auditor?.Emit(MaxTimeoutReached); exceptionMessage = "Maximum timeout reached while retrying request"; } - else if (Retried >= MaxRetries && MaxRetries > 0) + else if (attemptedNodes >= MaxRetries && MaxRetries > 0) { pipelineFailure = PipelineFailure.MaxRetriesReached; auditor?.Emit(MaxRetriesReached); @@ -239,7 +238,7 @@ private async ValueTask CallProductEndpointCoreAsync( var now = _dateTimeProvider.Now(); var activeNodes = _nodePool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now); - if (Retried >= activeNodes) + if (attemptedNodes >= activeNodes) { auditor?.Emit(FailedOverAllNodes); exceptionMessage += ", failed over to all the known alive nodes before failing"; @@ -336,7 +335,6 @@ public void MarkDead(Node node) { var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout); node.MarkDead(deadUntil); - Retried++; } /// Fast path for if only a single node could ever be yielded this save an IEnumerator allocation @@ -355,12 +353,11 @@ public bool TryGetSingleNode(out Node? node) } /// returns a consistent enumerable view into the available nodes - public IEnumerable NextNode(DateTimeOffset startedOn, Auditor? auditor) + public IEnumerable NextNode(DateTimeOffset startedOn, int attemptedNodes, Auditor? auditor) { if (_boundConfiguration.ForceNode != null) { yield return new Node(_boundConfiguration.ForceNode); - yield break; } @@ -370,11 +367,11 @@ public IEnumerable NextNode(DateTimeOffset startedOn, Auditor? auditor) var refreshed = false; for (var i = 0; i < 100; i++) { - if (DepletedRetries(startedOn)) yield break; + if (DepletedRetries(startedOn, attemptedNodes)) yield break; foreach (var node in _nodePool.CreateView(auditor)) { - if (DepletedRetries(startedOn)) break; + if (DepletedRetries(startedOn, attemptedNodes)) break; if (!_nodePredicate(node)) continue; diff --git a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs index fb743c1..779267f 100644 --- a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs +++ b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs @@ -51,13 +51,6 @@ internal HttpRequestInvoker(ResponseFactory responseFactory) public HttpRequestInvoker(Func wrappingHandler) : this(wrappingHandler, new DefaultResponseFactory()) { } - /// - /// Allows consumers to inject their own HttpMessageHandler, and optionally call our default implementation. - /// - public HttpRequestInvoker(Func wrappingHandler, ITransportConfiguration transportConfiguration) : - this(wrappingHandler, new DefaultResponseFactory()) - { } - internal HttpRequestInvoker(Func wrappingHandler, ResponseFactory responseFactory) { ResponseFactory = responseFactory; @@ -203,16 +196,7 @@ private async ValueTask RequestCoreAsync(bool isAsync, End receivedResponse?.Dispose(); } - if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false))) - return response; - - var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails); - - if (attributes is null) return response; - - foreach (var attribute in attributes) - Activity.Current?.SetTag(attribute.Key, attribute.Value); - + RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response); return response; } catch diff --git a/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs index c20315c..c2f99bf 100644 --- a/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs +++ b/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs @@ -194,14 +194,7 @@ private async ValueTask RequestCoreAsync(bool isAsync, End receivedResponse?.Dispose(); } - if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false)) - { - var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails); - foreach (var attribute in attributes) - { - Activity.Current?.SetTag(attribute.Key, attribute.Value); - } - } + RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response); return response; } @@ -421,7 +414,7 @@ protected virtual void SetProxyIfNeeded(HttpWebRequest request, BoundConfigurati protected virtual void SetAuthenticationIfNeeded(Endpoint endpoint, BoundConfiguration boundConfiguration, HttpWebRequest request) { //If user manually specifies an Authorization Header give it preference - if (boundConfiguration.Headers.HasKeys() && boundConfiguration.Headers.AllKeys.Contains("Authorization")) + if (boundConfiguration.Headers is not null && boundConfiguration.Headers.HasKeys() && boundConfiguration.Headers.AllKeys.Contains("Authorization")) { var header = boundConfiguration.Headers["Authorization"]; request.Headers["Authorization"] = header; diff --git a/src/Elastic.Transport/Components/TransportClient/RequestInvokerHelpers.cs b/src/Elastic.Transport/Components/TransportClient/RequestInvokerHelpers.cs new file mode 100644 index 0000000..a1d0ec8 --- /dev/null +++ b/src/Elastic.Transport/Components/TransportClient/RequestInvokerHelpers.cs @@ -0,0 +1,24 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Diagnostics; +using Elastic.Transport.Diagnostics; + +namespace Elastic.Transport; + +internal static class RequestInvokerHelpers +{ + public static void SetOtelAttributes(BoundConfiguration boundConfiguration, TResponse response) where TResponse : TransportResponse + { + if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false))) + return; + + var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails); + + if (attributes is null) return; + + foreach (var attribute in attributes) + Activity.Current?.SetTag(attribute.Key, attribute.Value); + } +} diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index 8364fd4..5a2154d 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -169,7 +169,7 @@ private async ValueTask RequestCoreAsync( } else { - foreach (var node in pipeline.NextNode(startedOn, auditor)) + foreach (var node in pipeline.NextNode(startedOn, attemptedNodes, auditor)) { attemptedNodes++; endpoint = endpoint with { Node = node }; @@ -265,7 +265,7 @@ private async ValueTask RequestCoreAsync( if (activity is { IsAllDataRequested: true }) OpenTelemetry.SetCommonAttributes(activity, Configuration); - return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response); + return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, attemptedNodes, auditor, seenExceptions, response); } finally { @@ -303,6 +303,7 @@ private TResponse FinalizeResponse( PostData? postData, RequestPipeline pipeline, DateTimeOffset startedOn, + int attemptedNodes, Auditor auditor, List? seenExceptions, TResponse? response @@ -312,7 +313,7 @@ private TResponse FinalizeResponse( pipeline.ThrowNoNodesAttempted(endpoint, auditor, seenExceptions); var callDetails = GetMostRecentCallDetails(response, seenExceptions); - var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, seenExceptions); + var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, attemptedNodes, seenExceptions); if (response?.ApiCallDetails == null) pipeline.BadResponse(ref response, callDetails, endpoint, boundConfiguration, postData, clientException, auditor); diff --git a/tests/Elastic.Transport.Tests/Components/NodePool/StaticNodePoolTests.cs b/tests/Elastic.Transport.Tests/Components/NodePool/StaticNodePoolTests.cs new file mode 100644 index 0000000..4b18c4f --- /dev/null +++ b/tests/Elastic.Transport.Tests/Components/NodePool/StaticNodePoolTests.cs @@ -0,0 +1,60 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Xunit; +using System; +using FluentAssertions; +using System.Linq; + +namespace Elastic.Transport.Tests.Components.NodePool +{ + public class StaticNodePoolTests + { + [Fact] + public void MultipleRequests_WhenOnlyASingleEndpointIsConfigured_AndTheEndpointIsUnavailable_DoNotThrowAnException() + { + Node[] nodes = [new Uri("http://localhost:9200")]; + var pool = new StaticNodePool(nodes); + var transport = new DistributedTransport(new TransportConfiguration(pool)); + + var response = transport.Request(HttpMethod.GET, "/", null, null); + + response.ApiCallDetails.SuccessOrKnownError.Should().BeFalse(); + response.ApiCallDetails.AuditTrail.Count.Should().Be(1); + + var audit = response.ApiCallDetails.AuditTrail.First(); + audit.Event.Should().Be(Diagnostics.Auditing.AuditEvent.BadRequest); + audit.Node.FailedAttempts.Should().Be(1); + audit.Node.IsAlive.Should().BeFalse(); + + response = transport.Request(HttpMethod.GET, "/", null, null); + + response.ApiCallDetails.SuccessOrKnownError.Should().BeFalse(); + + var eventCount = 0; + + foreach (var a in response.ApiCallDetails.AuditTrail) + { + eventCount++; + + if (eventCount == 1) + { + a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.AllNodesDead); + } + + if (eventCount == 2) + { + a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.Resurrection); + } + + if (eventCount == 3) + { + a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.BadRequest); + audit.Node.FailedAttempts.Should().Be(2); + audit.Node.IsAlive.Should().BeFalse(); + } + } + } + } +} diff --git a/tests/Elastic.Transport.Tests/Components/TransportClient/RequestInvokerTests.cs b/tests/Elastic.Transport.Tests/Components/TransportClient/RequestInvokerTests.cs new file mode 100644 index 0000000..2935189 --- /dev/null +++ b/tests/Elastic.Transport.Tests/Components/TransportClient/RequestInvokerTests.cs @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using Xunit; +using System; +using FluentAssertions; +using System.Threading.Tasks; +using System.Threading; +using System.Diagnostics; +using System.Collections.Generic; +using System.IO; +using Elastic.Transport.Diagnostics; +using System.Net.NetworkInformation; + +namespace Elastic.Transport.Tests.Components.TransportClient +{ + public class RequestInvokerTests + { + [Fact] + public void NoExceptionShouldBeThrown_WhenHttpResponseDoesNotIncludeCloudHeaders() + { + // This test validates that if `ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails` returns null, + // no exception is thrown and attributes are not set. + + using var listener = new ActivityListener + { + ActivityStarted = _ => { }, + ActivityStopped = activity => { }, + ShouldListenTo = activitySource => activitySource.Name == OpenTelemetry.ElasticTransportActivitySourceName, + Sample = (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData + }; + ActivitySource.AddActivityListener(listener); + + var requestInvoker = new HttpRequestInvoker(new TestResponseFactory()); + var pool = new SingleNodePool(new Uri("http://localhost:9200")); + var config = new TransportConfiguration(pool, requestInvoker); + var transport = new DistributedTransport(config); + + var response = transport.Head("/"); + response.ApiCallDetails.HttpStatusCode.Should().Be(200); + } + + private sealed class TestResponseFactory : ResponseFactory + { + public override TResponse Create( + Endpoint endpoint, + BoundConfiguration boundConfiguration, + PostData postData, + Exception ex, + int? statusCode, + Dictionary> headers, + Stream responseStream, + string contentType, + long contentLength, + IReadOnlyDictionary threadPoolStats, + IReadOnlyDictionary tcpStats) => CreateResponse(); + + public override Task CreateAsync( + Endpoint endpoint, + BoundConfiguration boundConfiguration, + PostData postData, + Exception ex, + int? statusCode, + Dictionary> headers, + Stream responseStream, + string contentType, + long contentLength, + IReadOnlyDictionary threadPoolStats, + IReadOnlyDictionary tcpStats, + CancellationToken cancellationToken = default) + { + var response = CreateResponse(); + return Task.FromResult(response); + } + + private static TResponse CreateResponse() where TResponse : TransportResponse, new() => new TResponse + { + ApiCallDetails = new() { HttpStatusCode = 200, Uri = new Uri("http://localhost/") } + }; + } + } +}