Skip to content

Commit 6dc8ff9

Browse files
authored
Ensure we track retry attempts per request (#146)
As the pipeline is now created once per transport, we need to track the retires outside of the pipeline. It also fixes two other bugs where we don't null check correctly. Closes #145
1 parent 1bd2db0 commit 6dc8ff9

File tree

7 files changed

+181
-39
lines changed

7 files changed

+181
-39
lines changed

src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private RequestConfiguration PingAndSniffRequestConfiguration
6464
}
6565
}
6666

67-
private bool DepletedRetries(DateTimeOffset startedOn) => Retried >= MaxRetries + 1 || IsTakingTooLong(startedOn);
67+
private bool DepletedRetries(DateTimeOffset startedOn, int attemptedNodes) => attemptedNodes >= MaxRetries + 1 || IsTakingTooLong(startedOn);
6868

6969
private bool FirstPoolUsageNeedsSniffing =>
7070
!RequestDisabledSniff
@@ -87,8 +87,6 @@ private bool IsTakingTooLong(DateTimeOffset startedOn)
8787

8888
private bool Refresh { get; set; }
8989

90-
private int Retried { get; set; }
91-
9290
private IEnumerable<Node> SniffNodes(Auditor? auditor) => _nodePool
9391
.CreateView(auditor)
9492
.ToList()
@@ -204,6 +202,7 @@ private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(
204202
Endpoint endpoint,
205203
Auditor? auditor,
206204
DateTimeOffset startedOn,
205+
int attemptedNodes,
207206
List<PipelineException>? seenExceptions
208207
)
209208
where TResponse : TransportResponse, new()
@@ -231,15 +230,15 @@ private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(
231230
auditor?.Emit(MaxTimeoutReached);
232231
exceptionMessage = "Maximum timeout reached while retrying request";
233232
}
234-
else if (Retried >= MaxRetries && MaxRetries > 0)
233+
else if (attemptedNodes >= MaxRetries && MaxRetries > 0)
235234
{
236235
pipelineFailure = PipelineFailure.MaxRetriesReached;
237236
auditor?.Emit(MaxRetriesReached);
238237
exceptionMessage = "Maximum number of retries reached";
239238

240239
var now = _dateTimeProvider.Now();
241240
var activeNodes = _nodePool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now);
242-
if (Retried >= activeNodes)
241+
if (attemptedNodes >= activeNodes)
243242
{
244243
auditor?.Emit(FailedOverAllNodes);
245244
exceptionMessage += ", failed over to all the known alive nodes before failing";
@@ -336,7 +335,6 @@ public void MarkDead(Node node)
336335
{
337336
var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout);
338337
node.MarkDead(deadUntil);
339-
Retried++;
340338
}
341339

342340
/// Fast path for <see cref="NextNode"/> if only a single node could ever be yielded this save an IEnumerator allocation
@@ -355,12 +353,11 @@ public bool TryGetSingleNode(out Node? node)
355353
}
356354

357355
/// returns a consistent enumerable view into the available nodes
358-
public IEnumerable<Node> NextNode(DateTimeOffset startedOn, Auditor? auditor)
356+
public IEnumerable<Node> NextNode(DateTimeOffset startedOn, int attemptedNodes, Auditor? auditor)
359357
{
360358
if (_boundConfiguration.ForceNode != null)
361359
{
362360
yield return new Node(_boundConfiguration.ForceNode);
363-
364361
yield break;
365362
}
366363

@@ -370,11 +367,11 @@ public IEnumerable<Node> NextNode(DateTimeOffset startedOn, Auditor? auditor)
370367
var refreshed = false;
371368
for (var i = 0; i < 100; i++)
372369
{
373-
if (DepletedRetries(startedOn)) yield break;
370+
if (DepletedRetries(startedOn, attemptedNodes)) yield break;
374371

375372
foreach (var node in _nodePool.CreateView(auditor))
376373
{
377-
if (DepletedRetries(startedOn)) break;
374+
if (DepletedRetries(startedOn, attemptedNodes)) break;
378375

379376
if (!_nodePredicate(node)) continue;
380377

src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,6 @@ internal HttpRequestInvoker(ResponseFactory responseFactory)
5151
public HttpRequestInvoker(Func<HttpMessageHandler, BoundConfiguration, HttpMessageHandler> wrappingHandler) :
5252
this(wrappingHandler, new DefaultResponseFactory()) { }
5353

54-
/// <summary>
55-
/// Allows consumers to inject their own HttpMessageHandler, and optionally call our default implementation.
56-
/// </summary>
57-
public HttpRequestInvoker(Func<HttpMessageHandler, BoundConfiguration, HttpMessageHandler> wrappingHandler, ITransportConfiguration transportConfiguration) :
58-
this(wrappingHandler, new DefaultResponseFactory())
59-
{ }
60-
6154
internal HttpRequestInvoker(Func<HttpMessageHandler, BoundConfiguration, HttpMessageHandler> wrappingHandler, ResponseFactory responseFactory)
6255
{
6356
ResponseFactory = responseFactory;
@@ -203,16 +196,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
203196
receivedResponse?.Dispose();
204197
}
205198

206-
if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
207-
return response;
208-
209-
var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
210-
211-
if (attributes is null) return response;
212-
213-
foreach (var attribute in attributes)
214-
Activity.Current?.SetTag(attribute.Key, attribute.Value);
215-
199+
RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response);
216200
return response;
217201
}
218202
catch

src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -194,14 +194,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
194194
receivedResponse?.Dispose();
195195
}
196196

197-
if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
198-
{
199-
var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
200-
foreach (var attribute in attributes)
201-
{
202-
Activity.Current?.SetTag(attribute.Key, attribute.Value);
203-
}
204-
}
197+
RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response);
205198

206199
return response;
207200
}
@@ -421,7 +414,7 @@ protected virtual void SetProxyIfNeeded(HttpWebRequest request, BoundConfigurati
421414
protected virtual void SetAuthenticationIfNeeded(Endpoint endpoint, BoundConfiguration boundConfiguration, HttpWebRequest request)
422415
{
423416
//If user manually specifies an Authorization Header give it preference
424-
if (boundConfiguration.Headers.HasKeys() && boundConfiguration.Headers.AllKeys.Contains("Authorization"))
417+
if (boundConfiguration.Headers is not null && boundConfiguration.Headers.HasKeys() && boundConfiguration.Headers.AllKeys.Contains("Authorization"))
425418
{
426419
var header = boundConfiguration.Headers["Authorization"];
427420
request.Headers["Authorization"] = header;
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System.Diagnostics;
6+
using Elastic.Transport.Diagnostics;
7+
8+
namespace Elastic.Transport;
9+
10+
internal static class RequestInvokerHelpers
11+
{
12+
public static void SetOtelAttributes<TResponse>(BoundConfiguration boundConfiguration, TResponse response) where TResponse : TransportResponse
13+
{
14+
if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
15+
return;
16+
17+
var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
18+
19+
if (attributes is null) return;
20+
21+
foreach (var attribute in attributes)
22+
Activity.Current?.SetTag(attribute.Key, attribute.Value);
23+
}
24+
}

src/Elastic.Transport/DistributedTransport.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
169169
}
170170
else
171171
{
172-
foreach (var node in pipeline.NextNode(startedOn, auditor))
172+
foreach (var node in pipeline.NextNode(startedOn, attemptedNodes, auditor))
173173
{
174174
attemptedNodes++;
175175
endpoint = endpoint with { Node = node };
@@ -265,7 +265,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
265265
if (activity is { IsAllDataRequested: true })
266266
OpenTelemetry.SetCommonAttributes(activity, Configuration);
267267

268-
return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response);
268+
return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, attemptedNodes, auditor, seenExceptions, response);
269269
}
270270
finally
271271
{
@@ -303,6 +303,7 @@ private TResponse FinalizeResponse<TResponse>(
303303
PostData? postData,
304304
RequestPipeline pipeline,
305305
DateTimeOffset startedOn,
306+
int attemptedNodes,
306307
Auditor auditor,
307308
List<PipelineException>? seenExceptions,
308309
TResponse? response
@@ -312,7 +313,7 @@ private TResponse FinalizeResponse<TResponse>(
312313
pipeline.ThrowNoNodesAttempted(endpoint, auditor, seenExceptions);
313314

314315
var callDetails = GetMostRecentCallDetails(response, seenExceptions);
315-
var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, seenExceptions);
316+
var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, attemptedNodes, seenExceptions);
316317

317318
if (response?.ApiCallDetails == null)
318319
pipeline.BadResponse(ref response, callDetails, endpoint, boundConfiguration, postData, clientException, auditor);
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using Xunit;
6+
using System;
7+
using FluentAssertions;
8+
using System.Linq;
9+
10+
namespace Elastic.Transport.Tests.Components.NodePool
11+
{
12+
public class StaticNodePoolTests
13+
{
14+
[Fact]
15+
public void MultipleRequests_WhenOnlyASingleEndpointIsConfigured_AndTheEndpointIsUnavailable_DoNotThrowAnException()
16+
{
17+
Node[] nodes = [new Uri("http://localhost:9200")];
18+
var pool = new StaticNodePool(nodes);
19+
var transport = new DistributedTransport(new TransportConfiguration(pool));
20+
21+
var response = transport.Request<StringResponse>(HttpMethod.GET, "/", null, null);
22+
23+
response.ApiCallDetails.SuccessOrKnownError.Should().BeFalse();
24+
response.ApiCallDetails.AuditTrail.Count.Should().Be(1);
25+
26+
var audit = response.ApiCallDetails.AuditTrail.First();
27+
audit.Event.Should().Be(Diagnostics.Auditing.AuditEvent.BadRequest);
28+
audit.Node.FailedAttempts.Should().Be(1);
29+
audit.Node.IsAlive.Should().BeFalse();
30+
31+
response = transport.Request<StringResponse>(HttpMethod.GET, "/", null, null);
32+
33+
response.ApiCallDetails.SuccessOrKnownError.Should().BeFalse();
34+
35+
var eventCount = 0;
36+
37+
foreach (var a in response.ApiCallDetails.AuditTrail)
38+
{
39+
eventCount++;
40+
41+
if (eventCount == 1)
42+
{
43+
a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.AllNodesDead);
44+
}
45+
46+
if (eventCount == 2)
47+
{
48+
a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.Resurrection);
49+
}
50+
51+
if (eventCount == 3)
52+
{
53+
a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.BadRequest);
54+
audit.Node.FailedAttempts.Should().Be(2);
55+
audit.Node.IsAlive.Should().BeFalse();
56+
}
57+
}
58+
}
59+
}
60+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using Xunit;
6+
using System;
7+
using FluentAssertions;
8+
using System.Threading.Tasks;
9+
using System.Threading;
10+
using System.Diagnostics;
11+
using System.Collections.Generic;
12+
using System.IO;
13+
using Elastic.Transport.Diagnostics;
14+
using System.Net.NetworkInformation;
15+
16+
namespace Elastic.Transport.Tests.Components.TransportClient
17+
{
18+
public class RequestInvokerTests
19+
{
20+
[Fact]
21+
public void NoExceptionShouldBeThrown_WhenHttpResponseDoesNotIncludeCloudHeaders()
22+
{
23+
// This test validates that if `ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails` returns null,
24+
// no exception is thrown and attributes are not set.
25+
26+
using var listener = new ActivityListener
27+
{
28+
ActivityStarted = _ => { },
29+
ActivityStopped = activity => { },
30+
ShouldListenTo = activitySource => activitySource.Name == OpenTelemetry.ElasticTransportActivitySourceName,
31+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData
32+
};
33+
ActivitySource.AddActivityListener(listener);
34+
35+
var requestInvoker = new HttpRequestInvoker(new TestResponseFactory());
36+
var pool = new SingleNodePool(new Uri("http://localhost:9200"));
37+
var config = new TransportConfiguration(pool, requestInvoker);
38+
var transport = new DistributedTransport(config);
39+
40+
var response = transport.Head("/");
41+
response.ApiCallDetails.HttpStatusCode.Should().Be(200);
42+
}
43+
44+
private sealed class TestResponseFactory : ResponseFactory
45+
{
46+
public override TResponse Create<TResponse>(
47+
Endpoint endpoint,
48+
BoundConfiguration boundConfiguration,
49+
PostData postData,
50+
Exception ex,
51+
int? statusCode,
52+
Dictionary<string, IEnumerable<string>> headers,
53+
Stream responseStream,
54+
string contentType,
55+
long contentLength,
56+
IReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats,
57+
IReadOnlyDictionary<TcpState, int> tcpStats) => CreateResponse<TResponse>();
58+
59+
public override Task<TResponse> CreateAsync<TResponse>(
60+
Endpoint endpoint,
61+
BoundConfiguration boundConfiguration,
62+
PostData postData,
63+
Exception ex,
64+
int? statusCode,
65+
Dictionary<string, IEnumerable<string>> headers,
66+
Stream responseStream,
67+
string contentType,
68+
long contentLength,
69+
IReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats,
70+
IReadOnlyDictionary<TcpState, int> tcpStats,
71+
CancellationToken cancellationToken = default)
72+
{
73+
var response = CreateResponse<TResponse>();
74+
return Task.FromResult(response);
75+
}
76+
77+
private static TResponse CreateResponse<TResponse>() where TResponse : TransportResponse, new() => new TResponse
78+
{
79+
ApiCallDetails = new() { HttpStatusCode = 200, Uri = new Uri("http://localhost/") }
80+
};
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)