Skip to content

Commit 95ea6c6

Browse files
committed
Refactor pipeline and node pool components
Refactor to decouple date provider and introduce Auditor usage. Renamed and altered functions within pipeline components to utilize the `Auditor` class, improving flexibility and modularity. Removed the embedded `DateTimeProvider` instance from several classes and ensured that such dependencies are injected or fetched through associated components like the node pools. This change enhances monitoring and logging capabilities during request processing. Adjust time tolerance in TransportConfigurationTests Increased the time tolerance for the 'LastUpdate' field comparison from 100 milliseconds to 2 seconds. This change enhances the reliability of the test by accommodating larger variations in timing. (cherry picked from commit 14207cb) Simplify RequestPipeline and reuse a singleton instance if we can Add DateTimeProvider and RequestPipelineFactory properties This commit introduces `DateTimeProvider` and `RequestPipelineFactory` properties to the transport configuration. The changes ensure that these properties are properly initialized and accessed throughout various components, enhancing configurability and testability of date and request pipeline behaviors.
1 parent 5c18b96 commit 95ea6c6

32 files changed

+778
-865
lines changed

src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private Auditor(Components.VirtualizedCluster cluster, Components.VirtualizedClu
3434

3535
public IEnumerable<Diagnostics.Auditing.Audit> AsyncAuditTrail { get; set; }
3636
public IEnumerable<Diagnostics.Auditing.Audit> AuditTrail { get; set; }
37-
public Func<Components.VirtualizedCluster> Cluster { get; set; }
37+
public Func<Components.VirtualizedCluster> Cluster { get; }
3838

3939
public TransportResponse Response { get; internal set; }
4040
public TransportResponse ResponseAsync { get; internal set; }

src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,15 @@ namespace Elastic.Transport.VirtualizedCluster.Components;
1111
public sealed class ExposingPipelineFactory<TConfiguration> : RequestPipelineFactory
1212
where TConfiguration : class, ITransportConfiguration
1313
{
14-
public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider dateTimeProvider)
14+
public ExposingPipelineFactory(TConfiguration configuration)
1515
{
16-
DateTimeProvider = dateTimeProvider;
1716
Configuration = configuration;
18-
Pipeline = Create(new RequestData(Configuration, null, null), DateTimeProvider);
19-
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider);
17+
Transport = new DistributedTransport<TConfiguration>(Configuration);
2018
}
2119

22-
// ReSharper disable once MemberCanBePrivate.Global
23-
public RequestPipeline Pipeline { get; }
24-
private DateTimeProvider DateTimeProvider { get; }
2520
private TConfiguration Configuration { get; }
26-
public ITransport<TConfiguration> RequestHandler { get; }
21+
public ITransport<TConfiguration> Transport { get; }
2722

28-
public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) =>
29-
new DefaultRequestPipeline(requestData, DateTimeProvider);
23+
public override RequestPipeline Create(RequestData requestData) =>
24+
new RequestPipeline(requestData);
3025
}

src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,21 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat
3131
private TransportConfigurationDescriptor CreateSettings() =>
3232
new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration);
3333

34+
3435
/// <summary> Create the cluster using all defaults on <see cref="TransportConfigurationDescriptor"/> </summary>
3536
public VirtualizedCluster AllDefaults() =>
36-
new(_dateTimeProvider, CreateSettings());
37+
new(CreateSettings());
3738

3839
/// <summary> Create the cluster using <paramref name="selector"/> to provide configuration changes </summary>
3940
/// <param name="selector">Provide custom configuration options</param>
4041
public VirtualizedCluster Settings(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector) =>
41-
new(_dateTimeProvider, selector(CreateSettings()));
42+
new(selector(CreateSettings()));
4243

4344
/// <summary>
4445
/// Allows you to create an instance of `<see cref="VirtualClusterConnection"/> using the DSL provided by <see cref="Virtual"/>
4546
/// </summary>
4647
/// <param name="selector">Provide custom configuration options</param>
4748
public VirtualClusterRequestInvoker VirtualClusterConnection(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector = null) =>
48-
new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings()))
49+
new VirtualizedCluster(selector == null ? CreateSettings() : selector(CreateSettings()))
4950
.Connection;
5051
}

src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ protected VirtualCluster(IEnumerable<Node> nodes, MockProductRegistration produc
1919
InternalNodes = nodes.ToList();
2020
}
2121

22-
public List<IClientCallRule> ClientCallRules { get; } = new List<IClientCallRule>();
23-
public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider();
22+
public List<IClientCallRule> ClientCallRules { get; } = new();
23+
private TestableDateTimeProvider TestDateTimeProvider { get; } = new();
2424

2525
protected List<Node> InternalNodes { get; }
2626
public IReadOnlyList<Node> Nodes => InternalNodes;
27-
public List<IRule> PingingRules { get; } = new List<IRule>();
27+
public List<IRule> PingingRules { get; } = new();
2828

29-
public List<ISniffRule> SniffingRules { get; } = new List<ISniffRule>();
29+
public List<ISniffRule> SniffingRules { get; } = new();
3030
internal string PublishAddressOverride { get; private set; }
3131

3232
internal bool SniffShouldReturnFqnd { get; private set; }
@@ -73,32 +73,34 @@ public VirtualCluster ClientCalls(Func<ClientCallRule, IClientCallRule> selector
7373
public SealedVirtualCluster SingleNodeConnection(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
7474
{
7575
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
76-
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), DateTimeProvider, ProductRegistration);
76+
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), TestDateTimeProvider, ProductRegistration);
7777
}
7878

7979
public SealedVirtualCluster StaticNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
8080
{
8181
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
82-
return new SealedVirtualCluster(this, new StaticNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
82+
var dateTimeProvider = TestDateTimeProvider;
83+
var nodePool = new StaticNodePool(nodes, false) { DateTimeProvider = dateTimeProvider };
84+
return new SealedVirtualCluster(this, nodePool , TestDateTimeProvider, ProductRegistration);
8385
}
8486

8587
public SealedVirtualCluster SniffingNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
8688
{
8789
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
88-
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
90+
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
8991
}
9092

9193
public SealedVirtualCluster StickyNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
9294
{
9395
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
94-
return new SealedVirtualCluster(this, new StickyNodePool(nodes, DateTimeProvider), DateTimeProvider, ProductRegistration);
96+
return new SealedVirtualCluster(this, new StickyNodePool(nodes) { DateTimeProvider = TestDateTimeProvider}, TestDateTimeProvider, ProductRegistration);
9597
}
9698

9799
public SealedVirtualCluster StickySniffingNodePool(Func<Node, float> sorter = null,
98100
Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null
99101
)
100102
{
101103
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
102-
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter, DateTimeProvider), DateTimeProvider, ProductRegistration);
104+
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
103105
}
104106
}

src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ private class VirtualResponse : TransportResponse;
2222

2323
private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/");
2424

25-
internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfigurationDescriptor settings)
25+
internal VirtualizedCluster(TransportConfigurationDescriptor settings)
2626
{
27-
_dateTimeProvider = dateTimeProvider;
2827
_settings = settings;
29-
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings, _dateTimeProvider);
28+
_dateTimeProvider = ((ITransportConfiguration)_settings).DateTimeProvider as TestableDateTimeProvider
29+
?? throw new ArgumentException("DateTime provider is not a TestableDateTimeProvider", nameof(_dateTimeProvider));
30+
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings);
3031

3132
_syncCall = (t, r) => t.Request<VirtualResponse>(
3233
path: RootPath,
@@ -52,7 +53,7 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport
5253

5354
public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.Connection as VirtualClusterRequestInvoker;
5455
public NodePool ConnectionPool => RequestHandler.Configuration.NodePool;
55-
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.RequestHandler;
56+
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.Transport;
5657

5758
public VirtualizedCluster TransportProxiesTo(
5859
Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> sync,

src/Elastic.Transport/Components/NodePool/CloudNodePool.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ public sealed class CloudNodePool : SingleNodePool
3636
/// <para> Read more here: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html</para>
3737
/// </param>
3838
/// <param name="credentials"></param>
39-
/// <param name="dateTimeProvider">Optionally inject an instance of <see cref="DateTimeProvider"/> used to set <see cref="NodePool.LastUpdate"/></param>
40-
public CloudNodePool(string cloudId, AuthorizationHeader credentials, DateTimeProvider dateTimeProvider = null) : this(ParseCloudId(cloudId), dateTimeProvider) =>
39+
public CloudNodePool(string cloudId, AuthorizationHeader credentials) : this(ParseCloudId(cloudId)) =>
4140
AuthenticationHeader = credentials;
4241

43-
private CloudNodePool(ParsedCloudId parsedCloudId, DateTimeProvider dateTimeProvider = null) : base(parsedCloudId.Uri, dateTimeProvider) =>
42+
private CloudNodePool(ParsedCloudId parsedCloudId) : base(parsedCloudId.Uri) =>
4443
ClusterName = parsedCloudId.Name;
4544

4645
//TODO implement debugger display for NodePool implementations and display it there and its ToString()
@@ -92,7 +91,4 @@ private static ParsedCloudId ParseCloudId(string cloudId)
9291

9392
return new ParsedCloudId(clusterName, new Uri($"https://{elasticsearchUuid}.{domainName}"));
9493
}
95-
96-
/// <inheritdoc />
97-
protected override void Dispose(bool disposing) => base.Dispose(disposing);
9894
}

src/Elastic.Transport/Components/NodePool/NodePool.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ public abstract class NodePool : IDisposable
2020
{
2121
private bool _disposed;
2222

23-
internal NodePool() { }
24-
2523
/// <summary>
2624
/// The last time that this instance was updated.
2725
/// </summary>
28-
public abstract DateTimeOffset LastUpdate { get; protected set; }
26+
public abstract DateTimeOffset? LastUpdate { get; protected set; }
27+
28+
/// <inheritdoc cref="DateTimeProvider"/>>
29+
public DateTimeProvider DateTimeProvider { get; set; } = DefaultDateTimeProvider.Default;
2930

3031
/// <summary>
3132
/// Returns the default maximum retries for the connection pool implementation.
@@ -82,18 +83,15 @@ public void Dispose()
8283
/// <param name="disposing"></param>
8384
protected virtual void Dispose(bool disposing)
8485
{
85-
if (!_disposed)
86-
{
87-
_disposed = true;
88-
}
86+
if (!_disposed) _disposed = true;
8987
}
9088

9189
/// <summary>
9290
/// Creates a view over the nodes, with changing starting positions, that wraps over on each call
9391
/// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1.
9492
/// if there are no live nodes yields a different dead node to try once
9593
/// </summary>
96-
public abstract IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null);
94+
public abstract IEnumerable<Node> CreateView(Auditor? auditor = null);
9795

9896
/// <summary>
9997
/// Reseeds the nodes. The implementation is responsible for thread safety.

src/Elastic.Transport/Components/NodePool/SingleNodePool.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,15 @@ namespace Elastic.Transport;
1212
public class SingleNodePool : NodePool
1313
{
1414
/// <inheritdoc cref="SingleNodePool"/>
15-
public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
15+
public SingleNodePool(Uri uri)
1616
{
1717
var node = new Node(uri);
1818
UsingSsl = node.Uri.Scheme == "https";
1919
Nodes = new List<Node> { node };
20-
LastUpdate = (dateTimeProvider ?? DefaultDateTimeProvider.Default).Now();
2120
}
2221

2322
/// <inheritdoc />
24-
public override DateTimeOffset LastUpdate { get; protected set; }
23+
public override DateTimeOffset? LastUpdate { get; protected set; }
2524

2625
/// <inheritdoc />
2726
public override int MaxRetries => 0;
@@ -39,11 +38,8 @@ public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
3938
public override bool UsingSsl { get; protected set; }
4039

4140
/// <inheritdoc />
42-
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null) => Nodes;
41+
public override IEnumerable<Node> CreateView(Auditor? auditor) => Nodes;
4342

4443
/// <inheritdoc />
4544
public override void Reseed(IEnumerable<Node> nodes) { } //ignored
46-
47-
/// <inheritdoc />
48-
protected override void Dispose(bool disposing) => base.Dispose(disposing);
4945
}

src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,13 @@ public class SniffingNodePool : StaticNodePool
2222
private readonly ReaderWriterLockSlim _readerWriter = new();
2323

2424
/// <inheritdoc cref="SniffingNodePool"/>>
25-
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true, DateTimeProvider dateTimeProvider = null)
26-
: base(uris, randomize, dateTimeProvider) { }
25+
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true) : base(uris, randomize) { }
2726

2827
/// <inheritdoc cref="SniffingNodePool"/>>
29-
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null)
30-
: base(nodes, randomize, dateTimeProvider) { }
28+
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true) : base(nodes, randomize) { }
3129

3230
/// <inheritdoc cref="SniffingNodePool"/>>
33-
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, DateTimeProvider dateTimeProvider = null)
34-
: base(nodes, nodeScorer, dateTimeProvider) { }
31+
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer) : base(nodes, nodeScorer) { }
3532

3633
/// <inheritdoc />
3734
public override IReadOnlyCollection<Node> Nodes
@@ -81,12 +78,12 @@ public override void Reseed(IEnumerable<Node> nodes)
8178
}
8279

8380
/// <inheritdoc />
84-
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
81+
public override IEnumerable<Node> CreateView(Auditor? auditor)
8582
{
8683
_readerWriter.EnterReadLock();
8784
try
8885
{
89-
return base.CreateView(audit);
86+
return base.CreateView(auditor);
9087
}
9188
finally
9289
{

0 commit comments

Comments
 (0)