Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TLS support #27

Merged
merged 3 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ nunit-agent*
test-output.log
TestResults.xml
TestResult.xml
Tests/coverage.xml
test.sh
*.VisualState.xml
.vscode
Expand Down Expand Up @@ -121,7 +120,7 @@ projects/Unit*/TestResult.xml
.*.sw?

# tests
Tests/coverage.*
coverage*

# docs
docs/temp/
Expand Down
12 changes: 10 additions & 2 deletions RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@

namespace RabbitMQ.AMQP.Client;

public class ConnectionException(string? message) : Exception(message);
public class ConnectionException : Exception
{
public ConnectionException(string message) : base(message)
{
}

public ConnectionException(string message, Exception innerException) : base(message, innerException)
{
}
}

public interface IConnection : ILifeCycle
{
Expand All @@ -12,6 +21,5 @@ public interface IConnection : ILifeCycle

IConsumerBuilder ConsumerBuilder();


public ReadOnlyCollection<IPublisher> GetPublishers();
}
59 changes: 45 additions & 14 deletions RabbitMQ.AMQP.Client/IConnectionSettings.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,50 @@
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;

namespace RabbitMQ.AMQP.Client;

public interface IConnectionSettings
public interface IConnectionSettings : IEquatable<IConnectionSettings>
{
string Host();

int Port();

string VirtualHost();


string User();

string Password();

string Scheme();
string Host { get; }
int Port { get; }
string VirtualHost { get; }
string User { get; }
string Password { get; }
string Scheme { get; }
string ConnectionName { get; }
string Path { get; }
bool UseSsl { get; }
ITlsSettings? TlsSettings { get; }
}

string ConnectionName();
/// <summary>
/// Contains the TLS/SSL settings for a connection.
/// </summary>
public interface ITlsSettings
{
/// <summary>
/// Client certificates to use for mutual authentication.
/// </summary>
X509CertificateCollection ClientCertificates { get; }

/// <summary>
/// Supported protocols to use.
/// </summary>
SslProtocols Protocols { get; }

/// <summary>
/// Specifies whether certificate revocation should be performed during handshake.
/// </summary>
bool CheckCertificateRevocation { get; }

/// <summary>
/// Gets or sets a certificate validation callback to validate remote certificate.
/// </summary>
RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; }

/// <summary>
/// Gets or sets a local certificate selection callback to select the certificate which should be used for authentication.
/// </summary>
LocalCertificateSelectionCallback? LocalCertificateSelectionCallback { get; }
}
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected void OnNewStatus(State newState, Error? error)
return;
}

var oldStatus = State;
State oldStatus = State;
State = newState;
ChangeState?.Invoke(this, oldStatus, newState, error);
}
Expand Down
64 changes: 48 additions & 16 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private const string ConnectionNotRecoveredMessage = "Connection not recovered";
private readonly SemaphoreSlim _semaphoreClose = new(1, 1);


// The native AMQP.Net Lite connection
private Connection? _nativeConnection;

Expand Down Expand Up @@ -71,7 +70,6 @@ private void ChangeConsumersStatus(State state, Error? error)
}
}


private async Task ReconnectEntities()
{
await ReconnectPublishers().ConfigureAwait(false);
Expand Down Expand Up @@ -102,7 +100,6 @@ private async Task ReconnectConsumers()
// TODO: Implement the semaphore to avoid multiple connections
// private readonly SemaphoreSlim _semaphore = new(1, 1);


/// <summary>
/// Publishers contains all the publishers created by the connection.
/// Each connection can have multiple publishers.
Expand All @@ -113,7 +110,6 @@ private async Task ReconnectConsumers()

internal ConcurrentDictionary<string, IConsumer> Consumers { get; } = new();


public ReadOnlyCollection<IPublisher> GetPublishers()
{
return Publishers.Values.ToList().AsReadOnly();
Expand Down Expand Up @@ -179,14 +175,18 @@ public IConsumerBuilder ConsumerBuilder()
return new AmqpConsumerBuilder(this);
}

protected override Task OpenAsync()
protected override async Task OpenAsync()
{
EnsureConnection();
return base.OpenAsync();
await EnsureConnection()
.ConfigureAwait(false);
await base.OpenAsync()
.ConfigureAwait(false);
}

private void EnsureConnection()
private async Task EnsureConnection()
{
// TODO: do this!
// await _semaphore.WaitAsync();
try
{
if (_nativeConnection is { IsClosed: false })
Expand All @@ -196,22 +196,53 @@ private void EnsureConnection()

var open = new Open
{
HostName = $"vhost:{_connectionSettings.VirtualHost()}",
HostName = $"vhost:{_connectionSettings.VirtualHost}",
Properties = new Fields()
{
[new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
}
};

var manualReset = new ManualResetEvent(false);
_nativeConnection = new Connection(_connectionSettings.Address, null, open, (connection, open1) =>
void onOpened(Amqp.IConnection connection, Open open1)
{
manualReset.Set();
Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}");
OnNewStatus(State.Open, null);
});
}

var cf = new ConnectionFactory();

if (_connectionSettings.UseSsl && _connectionSettings.TlsSettings is not null)
{
cf.SSL.Protocols = _connectionSettings.TlsSettings.Protocols;
cf.SSL.CheckCertificateRevocation = _connectionSettings.TlsSettings.CheckCertificateRevocation;

if (_connectionSettings.TlsSettings.ClientCertificates.Count > 0)
{
cf.SSL.ClientCertificates = _connectionSettings.TlsSettings.ClientCertificates;
}

if (_connectionSettings.TlsSettings.LocalCertificateSelectionCallback is not null)
{
cf.SSL.LocalCertificateSelectionCallback = _connectionSettings.TlsSettings.LocalCertificateSelectionCallback;
}

if (_connectionSettings.TlsSettings.RemoteCertificateValidationCallback is not null)
{
cf.SSL.RemoteCertificateValidationCallback = _connectionSettings.TlsSettings.RemoteCertificateValidationCallback;
}
}

try
{
_nativeConnection = await cf.CreateAsync(_connectionSettings.Address, open: open, onOpened: onOpened)
.ConfigureAwait(false);
}
catch (Exception ex)
{
throw new ConnectionException(
$"Connection failed. Info: {ToString()}", ex);
}

manualReset.WaitOne(TimeSpan.FromSeconds(5));
if (_nativeConnection.IsClosed)
{
throw new ConnectionException(
Expand Down Expand Up @@ -294,7 +325,8 @@ await Task.Run(async () =>
await Task.Delay(TimeSpan.FromMilliseconds(next))
.ConfigureAwait(false);

EnsureConnection();
await EnsureConnection()
.ConfigureAwait(false);
connected = true;
}
catch (Exception e)
Expand Down
Loading