Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public interface IRequestConfiguration
bool? DisableSniff { get; }

/// <summary>
/// Whether or not this request should be pipelined. http://en.wikipedia.org/wiki/HTTP_pipelining defaults to true
/// Whether this request should be pipelined. <see href="http://en.wikipedia.org/wiki/HTTP_pipelining"/> defaults to <see langword="true"/>.
/// </summary>
bool? HttpPipeliningEnabled { get; }

Expand Down
63 changes: 54 additions & 9 deletions src/Elastic.Transport/DistributedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Transport.Diagnostics;
Expand Down Expand Up @@ -35,6 +36,8 @@ public class DistributedTransport<TConfiguration> : ITransport<TConfiguration>
{
private readonly ProductRegistration _productRegistration;

private ConditionalWeakTable<RequestConfiguration, BoundConfiguration>? _boundConfigurations;

/// <summary>
/// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on
/// different nodes
Expand Down Expand Up @@ -97,15 +100,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(

try
{
// Unless per request configuration is provided, we can reuse a BoundConfiguration
// that is specific to this transport. If the IRequestConfiguration is an instance
// of BoundConfiguration we use that cached instance directly without rebinding.
var boundConfiguration = localConfiguration switch
{
BoundConfiguration bc => bc,
{ } rc => new BoundConfiguration(Configuration, rc),
_ => TransportBoundConfiguration
};
var boundConfiguration = BindConfiguration(localConfiguration);

Configuration.OnConfigurationBound?.Invoke(boundConfiguration);

Expand Down Expand Up @@ -273,6 +268,56 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
}
}

private BoundConfiguration BindConfiguration(IRequestConfiguration? localConfiguration)
{
// Unless per request configuration is provided, we can reuse a BoundConfiguration
// that is specific to this transport. If the IRequestConfiguration is an instance
// of BoundConfiguration we use that cached instance directly without rebinding.
return localConfiguration switch
{
BoundConfiguration bc => bc,
RequestConfiguration rc => GetOrCreateBoundConfiguration(rc),
not null => new BoundConfiguration(Configuration, localConfiguration),
_ => TransportBoundConfiguration
};

BoundConfiguration GetOrCreateBoundConfiguration(RequestConfiguration rc)
{
// Cache `BoundConfiguration` for requests with local request configuration.

// Since `IRequestConfiguration` might be implemented as mutable class, we use the
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the ClientCertificates and Headers properties are defined as specialized mutable collections. We should think about refactoring this part.

// cache only with the immutable `RequestConfiguration` record.

// ReSharper disable InconsistentlySynchronizedField

var cache = (Interlocked.CompareExchange(
ref _boundConfigurations,
new ConditionalWeakTable<RequestConfiguration, BoundConfiguration>(),
null
) ?? _boundConfigurations)!;

if (cache.TryGetValue(rc, out var boundConfiguration))
{
return boundConfiguration;
}

boundConfiguration = new BoundConfiguration(Configuration, rc);

#if NET8_0_OR_GREATER
cache.TryAdd(rc, boundConfiguration);
#else
lock (cache)
{
cache.Add(rc, boundConfiguration);
}
#endif

// ReSharper restore InconsistentlySynchronizedField

return boundConfiguration;
}
}

private static void ThrowUnexpectedTransportException<TResponse>(Exception killerException,
List<PipelineException> seenExceptions,
Endpoint endpoint,
Expand Down
Loading