|
6 | 6 | using System.Collections.Generic; |
7 | 7 | using System.Diagnostics; |
8 | 8 | using System.Linq; |
| 9 | +using System.Runtime.CompilerServices; |
9 | 10 | using System.Threading; |
10 | 11 | using System.Threading.Tasks; |
11 | 12 | using Elastic.Transport.Diagnostics; |
@@ -35,6 +36,8 @@ public class DistributedTransport<TConfiguration> : ITransport<TConfiguration> |
35 | 36 | { |
36 | 37 | private readonly ProductRegistration _productRegistration; |
37 | 38 |
|
| 39 | + private ConditionalWeakTable<RequestConfiguration, BoundConfiguration>? _boundConfigurations; |
| 40 | + |
38 | 41 | /// <summary> |
39 | 42 | /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on |
40 | 43 | /// different nodes |
@@ -97,15 +100,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>( |
97 | 100 |
|
98 | 101 | try |
99 | 102 | { |
100 | | - // Unless per request configuration is provided, we can reuse a BoundConfiguration |
101 | | - // that is specific to this transport. If the IRequestConfiguration is an instance |
102 | | - // of BoundConfiguration we use that cached instance directly without rebinding. |
103 | | - var boundConfiguration = localConfiguration switch |
104 | | - { |
105 | | - BoundConfiguration bc => bc, |
106 | | - { } rc => new BoundConfiguration(Configuration, rc), |
107 | | - _ => TransportBoundConfiguration |
108 | | - }; |
| 103 | + var boundConfiguration = BindConfiguration(localConfiguration); |
109 | 104 |
|
110 | 105 | Configuration.OnConfigurationBound?.Invoke(boundConfiguration); |
111 | 106 |
|
@@ -273,6 +268,56 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>( |
273 | 268 | } |
274 | 269 | } |
275 | 270 |
|
| 271 | + private BoundConfiguration BindConfiguration(IRequestConfiguration? localConfiguration) |
| 272 | + { |
| 273 | + // Unless per request configuration is provided, we can reuse a BoundConfiguration |
| 274 | + // that is specific to this transport. If the IRequestConfiguration is an instance |
| 275 | + // of BoundConfiguration we use that cached instance directly without rebinding. |
| 276 | + return localConfiguration switch |
| 277 | + { |
| 278 | + BoundConfiguration bc => bc, |
| 279 | + RequestConfiguration rc => GetOrCreateBoundConfiguration(rc), |
| 280 | + not null => new BoundConfiguration(Configuration, localConfiguration), |
| 281 | + _ => TransportBoundConfiguration |
| 282 | + }; |
| 283 | + |
| 284 | + BoundConfiguration GetOrCreateBoundConfiguration(RequestConfiguration rc) |
| 285 | + { |
| 286 | + // Cache `BoundConfiguration` for requests with local request configuration. |
| 287 | + |
| 288 | + // Since `IRequestConfiguration` might be implemented as mutable class, we use the |
| 289 | + // cache only with the immutable `RequestConfiguration` record. |
| 290 | + |
| 291 | + // ReSharper disable InconsistentlySynchronizedField |
| 292 | + |
| 293 | + var cache = (Interlocked.CompareExchange( |
| 294 | + ref _boundConfigurations, |
| 295 | + new ConditionalWeakTable<RequestConfiguration, BoundConfiguration>(), |
| 296 | + null |
| 297 | + ) ?? _boundConfigurations)!; |
| 298 | + |
| 299 | + if (cache.TryGetValue(rc, out var boundConfiguration)) |
| 300 | + { |
| 301 | + return boundConfiguration; |
| 302 | + } |
| 303 | + |
| 304 | + boundConfiguration = new BoundConfiguration(Configuration, rc); |
| 305 | + |
| 306 | +#if NET8_0_OR_GREATER |
| 307 | + cache.TryAdd(rc, boundConfiguration); |
| 308 | +#else |
| 309 | + lock (cache) |
| 310 | + { |
| 311 | + cache.Add(rc, boundConfiguration); |
| 312 | + } |
| 313 | +#endif |
| 314 | + |
| 315 | + // ReSharper restore InconsistentlySynchronizedField |
| 316 | + |
| 317 | + return boundConfiguration; |
| 318 | + } |
| 319 | + } |
| 320 | + |
276 | 321 | private static void ThrowUnexpectedTransportException<TResponse>(Exception killerException, |
277 | 322 | List<PipelineException> seenExceptions, |
278 | 323 | Endpoint endpoint, |
|
0 commit comments