Skip to content

Commit

Permalink
Add the virtualhost configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Jun 5, 2024
1 parent dbae1c3 commit da9fbe3
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 48 deletions.
26 changes: 15 additions & 11 deletions RabbitMQ.AMQP.Client/AmqpAddress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ public class AmqpAddressBuilder
private int _port = 5672;
private string _user = "guest";
private string _password = "guest";
private string _path = "/";
private string _scheme = "AMQP";
private string _connection = "AMQP.NET";
private string _virtualHost = "/";


public AmqpAddressBuilder Host(string host)
Expand All @@ -37,11 +37,6 @@ public AmqpAddressBuilder Password(string password)
return this;
}

public AmqpAddressBuilder Path(string path)
{
_path = path;
return this;
}

public AmqpAddressBuilder Scheme(string scheme)
{
Expand All @@ -55,10 +50,16 @@ public AmqpAddressBuilder ConnectionName(string connection)
return this;
}

public AmqpAddressBuilder VirtualHost(string virtualHost)
{
_virtualHost = virtualHost;
return this;
}

public AmqpAddress Build()
{
return new AmqpAddress(_host, _port, _user,
_password, _path,
_password, _virtualHost,
_scheme, _connection);
}
}
Expand All @@ -71,6 +72,7 @@ public class AmqpAddress : IAddress
internal Address Address { get; }

private readonly string _connectionName = "AMQP.NET";
private readonly string _virtualHost = "/";


public AmqpAddress(string address)
Expand All @@ -81,10 +83,11 @@ public AmqpAddress(string address)
public AmqpAddress(string host, int port,
string user,
string password,
string path, string scheme, string connectionName)
string virtualHost, string scheme, string connectionName)
{
Address = new Address(host, port, user, password, path, scheme);
Address = new Address(host, port, user, password, "/", scheme);
_connectionName = connectionName;
_virtualHost = virtualHost;
}

public string Host()
Expand All @@ -99,9 +102,10 @@ public int Port()
}


public string Path()

public string VirtualHost()
{
return Address.Path;
return _virtualHost;
}

public string User()
Expand Down
63 changes: 42 additions & 21 deletions RabbitMQ.AMQP.Client/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public async Task ConnectAsync(IAddress address)
.Port(address.Port())
.User(address.User())
.Password(address.Password())
.Path(address.Path())
.VirtualHost(address.VirtualHost())
.ConnectionName(address.ConnectionName())
.Scheme(address.Scheme())
.Build();
Expand All @@ -32,32 +32,53 @@ public async Task ConnectAsync(IAddress address)

internal async Task EnsureConnectionAsync()
{
if (_nativeConnection == null || _nativeConnection.IsClosed)
try
{
var open = new Open
if (_nativeConnection == null || _nativeConnection.IsClosed)
{
Properties = new Fields()
var open = new Open
{
[new Symbol("connection_name")] = _address.ConnectionName()
}
};
var connection = await Connection.Factory.CreateAsync(_address.Address, open);
connection.Closed += (sender, error) =>
{
var unexpected = Status != Status.Closed;
Status = Status.Closed;
HostName = $"vhost:{_address.VirtualHost()}",
Properties = new Fields()
{
[new Symbol("connection_name")] = _address.ConnectionName(),
}
};
var connection = await Connection.Factory.CreateAsync(_address.Address, open);
connection.Closed += (sender, error) =>
{
var unexpected = Status != Status.Closed;
Status = Status.Closed;
Closed?.Invoke(this, unexpected);
Closed?.Invoke(this, unexpected);
Trace.WriteLine(TraceLevel.Warning, $"connection is closed " +
$"{sender} {error} {Status} " +
$"{connection.IsClosed}");
};
_nativeConnection = connection;
_management.Init(connection);
}
Trace.WriteLine(TraceLevel.Warning, $"connection is closed " +
$"{sender} {error} {Status} " +
$"{connection.IsClosed}");
};
_nativeConnection = connection;
_management.Init(connection);
}

Status = Status.Open;
Status = Status.Open;
}
catch (Amqp.AmqpException e)
{
throw new ConnectionException("AmqpException: Connection failed", e);
}
catch (System.OperationCanceledException e)
{
// wrong virtual host
throw new ConnectionException("OperationCanceledException: Connection failed", e);
}

catch (NotSupportedException e)
{
// wrong schema
throw new ConnectionException("NotSupportedException: Connection failed", e);
}


}


Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.AMQP.Client/IAddress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ public interface IAddress

int Port();

string Path();
string VirtualHost();


string User();

Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
namespace RabbitMQ.AMQP.Client;

public class ConnectionException(string? message, Exception? innerException) : Exception(message, innerException);

public interface IConnection
{
IManagement Management();
Task ConnectAsync(IAddress amqpAddress);


}
55 changes: 51 additions & 4 deletions Tests/ConnectionRecoverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ namespace Tests;

public class ConnectionRecoverTests
{

[Fact]
public async void UnexpectedClose()
public async void NormalCloseShouldSetUnexpectedFlagToFalse()
{
AmqpConnection connection = new();

var completion = new TaskCompletionSource<bool>();
connection.Closed += (sender, unexpected) =>
{
Expand All @@ -22,12 +22,59 @@ public async void UnexpectedClose()

var connectionName = Guid.NewGuid().ToString();
await connection.ConnectAsync(new AmqpAddressBuilder().ConnectionName(connectionName).Build());
SystemUtils.WaitUntilAsync(async () => await SystemUtils.ConnectionsCountByName(connectionName) == 1);
SystemUtils.WaitUntilAsync(async () => await SystemUtils.HttpKillConnections(connectionName) == 1);
Assert.Equal(Status.Open, connection.Status);
await connection.CloseAsync();

var result = await completion.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.False(result);
Assert.Equal(Status.Closed, connection.Status);
}
[Fact]
public async void UnexpectedCloseShouldSetUnexpectedFlag()
{
AmqpConnection connection = new();
var completion = new TaskCompletionSource<bool>();
connection.Closed += (sender, unexpected) =>
{
if (!unexpected) return;
Assert.Equal(Status.Closed, connection.Status);
completion.SetResult(true);
};

var connectionName = Guid.NewGuid().ToString();
await connection.ConnectAsync(new AmqpAddressBuilder().ConnectionName(connectionName).Build());
SystemUtils.WaitUntilConnectionIsKilled(connectionName);

var result = await completion.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.True(result);
Assert.Equal(Status.Closed, connection.Status);
await connection.CloseAsync();
Assert.Equal(Status.Closed, connection.Status);
}


[Fact]
public async void RecoverFromUnexpectedClose()
{
AmqpConnection connection = new();
var completion = new TaskCompletionSource<Status>();
connection.Closed += async (sender, unexpected) =>
{
if (!unexpected) return;
Assert.Equal(Status.Closed, connection.Status);
Assert.True(unexpected);
SystemUtils.Wait();
await connection.EnsureConnectionAsync();
completion.SetResult(connection.Status);
};

var connectionName = Guid.NewGuid().ToString();
await connection.ConnectAsync(new AmqpAddressBuilder().ConnectionName(connectionName).Build());
SystemUtils.WaitUntilConnectionIsKilled(connectionName);

var result = await completion.Task.WaitAsync(TimeSpan.FromSeconds(25));
Assert.Equal(Status.Open, result);
await connection.CloseAsync();
Assert.Equal(Status.Closed, connection.Status);
}
}
49 changes: 43 additions & 6 deletions Tests/ConnectionTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using RabbitMQ.AMQP.Client;
using System;
using System.Net.Sockets;
using Amqp;
using Amqp.Framing;
using RabbitMQ.AMQP.Client;

namespace Tests;

Expand All @@ -10,12 +14,12 @@ public class ConnectionTests
public void ValidateAddress()
{
AmqpAddress amqpAddress = new("localhost", 5672, "guest-user",
"guest-password", "path/", "amqp1", "connection_name");
"guest-password", "vhost_1", "amqp1", "connection_name");
Assert.Equal("localhost", amqpAddress.Host());
Assert.Equal(5672, amqpAddress.Port());
Assert.Equal("guest-user", amqpAddress.User());
Assert.Equal("guest-password", amqpAddress.Password());
Assert.Equal("path/", amqpAddress.Path());
Assert.Equal("vhost_1", amqpAddress.VirtualHost());
Assert.Equal("amqp1", amqpAddress.Scheme());

AmqpAddress second = new("localhost", 5672, "guest-user",
Expand All @@ -32,10 +36,10 @@ public void ValidateAddress()
[Fact]
public void ValidateAddressBuilder()
{
AmqpAddress address = new AmqpAddressBuilder()
var address = new AmqpAddressBuilder()
.Host("localhost")
.Port(5672)
.Path("path/")
.VirtualHost("v1")
.User("guest-t")
.Password("guest-w")
.Scheme("amqp1")
Expand All @@ -45,7 +49,40 @@ public void ValidateAddressBuilder()
Assert.Equal(5672, address.Port());
Assert.Equal("guest-t", address.User());
Assert.Equal("guest-w", address.Password());
Assert.Equal("path/", address.Path());
Assert.Equal("v1", address.VirtualHost());
Assert.Equal("amqp1", address.Scheme());
}

[Fact]
public async void RaiseErrorsIfTheParametersAreNotValid()
{
AmqpConnection connection = new();
await Assert.ThrowsAsync<ConnectionException>( async () => await connection.ConnectAsync(new
AmqpAddressBuilder().VirtualHost("wrong_vhost").Build()));
Assert.Equal(Status.Closed, connection.Status);


await Assert.ThrowsAsync<SocketException>( async () => await connection.ConnectAsync(new
AmqpAddressBuilder().Host("wrong_host").Build()));
Assert.Equal(Status.Closed, connection.Status);


await Assert.ThrowsAsync<ConnectionException>( async () => await connection.ConnectAsync(new
AmqpAddressBuilder().Password("wrong_password").Build()));
Assert.Equal(Status.Closed, connection.Status);

await Assert.ThrowsAsync<ConnectionException>( async () => await connection.ConnectAsync(new
AmqpAddressBuilder().User("wrong_user").Build()));
Assert.Equal(Status.Closed, connection.Status);

await Assert.ThrowsAsync<SocketException>( async () => await connection.ConnectAsync(new
AmqpAddressBuilder().Port(0).Build()));
Assert.Equal(Status.Closed, connection.Status);


await Assert.ThrowsAsync<ConnectionException>( async () => await connection.ConnectAsync(new
AmqpAddressBuilder().Scheme("wrong_scheme").Build()));
Assert.Equal(Status.Closed, connection.Status);
}

}
13 changes: 10 additions & 3 deletions Tests/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ public static async Task<bool> IsConnectionOpen(string connectionName)
var result = await client.GetAsync("http://localhost:15672/api/connections");
if (!result.IsSuccessStatusCode)
{
throw new XunitException(string.Format("HTTP GET failed: {0} {1}", result.StatusCode,
result.ReasonPhrase));
throw new XunitException($"HTTP GET failed: {result.StatusCode} {result.ReasonPhrase}");
}

var obj = await JsonSerializer.DeserializeAsync(result.Content.ReadAsStream(), typeof(IEnumerable<Connection>));
var obj = await JsonSerializer.DeserializeAsync(await result.Content.ReadAsStreamAsync(),
typeof(IEnumerable<Connection>));
if (obj == null) return false;
var connections = obj as IEnumerable<Connection>;
isOpen = connections.Any(x => x.client_properties["connection_name"].Contains(connectionName));
Expand Down Expand Up @@ -193,6 +193,13 @@ public static async Task<int> HttpKillConnections(string connectionName)
return killed;
}

public static int WaitUntilConnectionIsKilled(string connectionName)
{
Wait();
WaitUntilAsync(async () => await HttpKillConnections(connectionName) == 1);
return 1;
}

private static HttpClient CreateHttpClient()
{
var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), };
Expand Down

0 comments on commit da9fbe3

Please sign in to comment.