diff --git a/RabbitMQ.AMQP.Client/AmqpAddress.cs b/RabbitMQ.AMQP.Client/AmqpAddress.cs index c1ecd66..d7ff6f5 100644 --- a/RabbitMQ.AMQP.Client/AmqpAddress.cs +++ b/RabbitMQ.AMQP.Client/AmqpAddress.cs @@ -8,9 +8,9 @@ public class AmqpAddressBuilder private int _port = 5672; private string _user = "guest"; private string _password = "guest"; - private string _path = "/"; private string _scheme = "AMQP"; private string _connection = "AMQP.NET"; + private string _virtualHost = "/"; public AmqpAddressBuilder Host(string host) @@ -37,11 +37,6 @@ public AmqpAddressBuilder Password(string password) return this; } - public AmqpAddressBuilder Path(string path) - { - _path = path; - return this; - } public AmqpAddressBuilder Scheme(string scheme) { @@ -55,10 +50,16 @@ public AmqpAddressBuilder ConnectionName(string connection) return this; } + public AmqpAddressBuilder VirtualHost(string virtualHost) + { + _virtualHost = virtualHost; + return this; + } + public AmqpAddress Build() { return new AmqpAddress(_host, _port, _user, - _password, _path, + _password, _virtualHost, _scheme, _connection); } } @@ -71,6 +72,7 @@ public class AmqpAddress : IAddress internal Address Address { get; } private readonly string _connectionName = "AMQP.NET"; + private readonly string _virtualHost = "/"; public AmqpAddress(string address) @@ -81,10 +83,11 @@ public AmqpAddress(string address) public AmqpAddress(string host, int port, string user, string password, - string path, string scheme, string connectionName) + string virtualHost, string scheme, string connectionName) { - Address = new Address(host, port, user, password, path, scheme); + Address = new Address(host, port, user, password, "/", scheme); _connectionName = connectionName; + _virtualHost = virtualHost; } public string Host() @@ -99,9 +102,10 @@ public int Port() } - public string Path() + + public string VirtualHost() { - return Address.Path; + return _virtualHost; } public string User() diff --git a/RabbitMQ.AMQP.Client/AmqpConnection.cs b/RabbitMQ.AMQP.Client/AmqpConnection.cs index 8565bc5..72ce1da 100644 --- a/RabbitMQ.AMQP.Client/AmqpConnection.cs +++ b/RabbitMQ.AMQP.Client/AmqpConnection.cs @@ -22,7 +22,7 @@ public async Task ConnectAsync(IAddress address) .Port(address.Port()) .User(address.User()) .Password(address.Password()) - .Path(address.Path()) + .VirtualHost(address.VirtualHost()) .ConnectionName(address.ConnectionName()) .Scheme(address.Scheme()) .Build(); @@ -32,32 +32,53 @@ public async Task ConnectAsync(IAddress address) internal async Task EnsureConnectionAsync() { - if (_nativeConnection == null || _nativeConnection.IsClosed) + try { - var open = new Open + if (_nativeConnection == null || _nativeConnection.IsClosed) { - Properties = new Fields() + var open = new Open { - [new Symbol("connection_name")] = _address.ConnectionName() - } - }; - var connection = await Connection.Factory.CreateAsync(_address.Address, open); - connection.Closed += (sender, error) => - { - var unexpected = Status != Status.Closed; - Status = Status.Closed; + HostName = $"vhost:{_address.VirtualHost()}", + Properties = new Fields() + { + [new Symbol("connection_name")] = _address.ConnectionName(), + } + }; + var connection = await Connection.Factory.CreateAsync(_address.Address, open); + connection.Closed += (sender, error) => + { + var unexpected = Status != Status.Closed; + Status = Status.Closed; - Closed?.Invoke(this, unexpected); + Closed?.Invoke(this, unexpected); - Trace.WriteLine(TraceLevel.Warning, $"connection is closed " + - $"{sender} {error} {Status} " + - $"{connection.IsClosed}"); - }; - _nativeConnection = connection; - _management.Init(connection); - } + Trace.WriteLine(TraceLevel.Warning, $"connection is closed " + + $"{sender} {error} {Status} " + + $"{connection.IsClosed}"); + }; + _nativeConnection = connection; + _management.Init(connection); + } - Status = Status.Open; + Status = Status.Open; + } + catch (Amqp.AmqpException e) + { + throw new ConnectionException("AmqpException: Connection failed", e); + } + catch (System.OperationCanceledException e) + { + // wrong virtual host + throw new ConnectionException("OperationCanceledException: Connection failed", e); + } + + catch (NotSupportedException e) + { + // wrong schema + throw new ConnectionException("NotSupportedException: Connection failed", e); + } + + } diff --git a/RabbitMQ.AMQP.Client/IAddress.cs b/RabbitMQ.AMQP.Client/IAddress.cs index f815c47..3356b8f 100644 --- a/RabbitMQ.AMQP.Client/IAddress.cs +++ b/RabbitMQ.AMQP.Client/IAddress.cs @@ -6,7 +6,8 @@ public interface IAddress int Port(); - string Path(); + string VirtualHost(); + string User(); diff --git a/RabbitMQ.AMQP.Client/IConnection.cs b/RabbitMQ.AMQP.Client/IConnection.cs index a22f881..b26e1ce 100644 --- a/RabbitMQ.AMQP.Client/IConnection.cs +++ b/RabbitMQ.AMQP.Client/IConnection.cs @@ -1,9 +1,9 @@ namespace RabbitMQ.AMQP.Client; +public class ConnectionException(string? message, Exception? innerException) : Exception(message, innerException); + public interface IConnection { IManagement Management(); Task ConnectAsync(IAddress amqpAddress); - - } \ No newline at end of file diff --git a/Tests/ConnectionRecoverTests.cs b/Tests/ConnectionRecoverTests.cs index ea98b12..ab5846f 100644 --- a/Tests/ConnectionRecoverTests.cs +++ b/Tests/ConnectionRecoverTests.cs @@ -8,11 +8,11 @@ namespace Tests; public class ConnectionRecoverTests { + [Fact] - public async void UnexpectedClose() + public async void NormalCloseShouldSetUnexpectedFlagToFalse() { AmqpConnection connection = new(); - var completion = new TaskCompletionSource(); connection.Closed += (sender, unexpected) => { @@ -22,12 +22,59 @@ public async void UnexpectedClose() var connectionName = Guid.NewGuid().ToString(); await connection.ConnectAsync(new AmqpAddressBuilder().ConnectionName(connectionName).Build()); - SystemUtils.WaitUntilAsync(async () => await SystemUtils.ConnectionsCountByName(connectionName) == 1); - SystemUtils.WaitUntilAsync(async () => await SystemUtils.HttpKillConnections(connectionName) == 1); + Assert.Equal(Status.Open, connection.Status); + await connection.CloseAsync(); + + var result = await completion.Task.WaitAsync(TimeSpan.FromSeconds(5)); + Assert.False(result); + Assert.Equal(Status.Closed, connection.Status); + } + [Fact] + public async void UnexpectedCloseShouldSetUnexpectedFlag() + { + AmqpConnection connection = new(); + var completion = new TaskCompletionSource(); + connection.Closed += (sender, unexpected) => + { + if (!unexpected) return; + Assert.Equal(Status.Closed, connection.Status); + completion.SetResult(true); + }; + + var connectionName = Guid.NewGuid().ToString(); + await connection.ConnectAsync(new AmqpAddressBuilder().ConnectionName(connectionName).Build()); + SystemUtils.WaitUntilConnectionIsKilled(connectionName); var result = await completion.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(result); Assert.Equal(Status.Closed, connection.Status); await connection.CloseAsync(); + Assert.Equal(Status.Closed, connection.Status); + } + + + [Fact] + public async void RecoverFromUnexpectedClose() + { + AmqpConnection connection = new(); + var completion = new TaskCompletionSource(); + connection.Closed += async (sender, unexpected) => + { + if (!unexpected) return; + Assert.Equal(Status.Closed, connection.Status); + Assert.True(unexpected); + SystemUtils.Wait(); + await connection.EnsureConnectionAsync(); + completion.SetResult(connection.Status); + }; + + var connectionName = Guid.NewGuid().ToString(); + await connection.ConnectAsync(new AmqpAddressBuilder().ConnectionName(connectionName).Build()); + SystemUtils.WaitUntilConnectionIsKilled(connectionName); + + var result = await completion.Task.WaitAsync(TimeSpan.FromSeconds(25)); + Assert.Equal(Status.Open, result); + await connection.CloseAsync(); + Assert.Equal(Status.Closed, connection.Status); } } \ No newline at end of file diff --git a/Tests/ConnectionTests.cs b/Tests/ConnectionTests.cs index 770e1df..c9a9481 100644 --- a/Tests/ConnectionTests.cs +++ b/Tests/ConnectionTests.cs @@ -1,4 +1,8 @@ -using RabbitMQ.AMQP.Client; +using System; +using System.Net.Sockets; +using Amqp; +using Amqp.Framing; +using RabbitMQ.AMQP.Client; namespace Tests; @@ -10,12 +14,12 @@ public class ConnectionTests public void ValidateAddress() { AmqpAddress amqpAddress = new("localhost", 5672, "guest-user", - "guest-password", "path/", "amqp1", "connection_name"); + "guest-password", "vhost_1", "amqp1", "connection_name"); Assert.Equal("localhost", amqpAddress.Host()); Assert.Equal(5672, amqpAddress.Port()); Assert.Equal("guest-user", amqpAddress.User()); Assert.Equal("guest-password", amqpAddress.Password()); - Assert.Equal("path/", amqpAddress.Path()); + Assert.Equal("vhost_1", amqpAddress.VirtualHost()); Assert.Equal("amqp1", amqpAddress.Scheme()); AmqpAddress second = new("localhost", 5672, "guest-user", @@ -32,10 +36,10 @@ public void ValidateAddress() [Fact] public void ValidateAddressBuilder() { - AmqpAddress address = new AmqpAddressBuilder() + var address = new AmqpAddressBuilder() .Host("localhost") .Port(5672) - .Path("path/") + .VirtualHost("v1") .User("guest-t") .Password("guest-w") .Scheme("amqp1") @@ -45,7 +49,40 @@ public void ValidateAddressBuilder() Assert.Equal(5672, address.Port()); Assert.Equal("guest-t", address.User()); Assert.Equal("guest-w", address.Password()); - Assert.Equal("path/", address.Path()); + Assert.Equal("v1", address.VirtualHost()); Assert.Equal("amqp1", address.Scheme()); } + + [Fact] + public async void RaiseErrorsIfTheParametersAreNotValid() + { + AmqpConnection connection = new(); + await Assert.ThrowsAsync( async () => await connection.ConnectAsync(new + AmqpAddressBuilder().VirtualHost("wrong_vhost").Build())); + Assert.Equal(Status.Closed, connection.Status); + + + await Assert.ThrowsAsync( async () => await connection.ConnectAsync(new + AmqpAddressBuilder().Host("wrong_host").Build())); + Assert.Equal(Status.Closed, connection.Status); + + + await Assert.ThrowsAsync( async () => await connection.ConnectAsync(new + AmqpAddressBuilder().Password("wrong_password").Build())); + Assert.Equal(Status.Closed, connection.Status); + + await Assert.ThrowsAsync( async () => await connection.ConnectAsync(new + AmqpAddressBuilder().User("wrong_user").Build())); + Assert.Equal(Status.Closed, connection.Status); + + await Assert.ThrowsAsync( async () => await connection.ConnectAsync(new + AmqpAddressBuilder().Port(0).Build())); + Assert.Equal(Status.Closed, connection.Status); + + + await Assert.ThrowsAsync( async () => await connection.ConnectAsync(new + AmqpAddressBuilder().Scheme("wrong_scheme").Build())); + Assert.Equal(Status.Closed, connection.Status); + } + } \ No newline at end of file diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 3acebc7..e170eba 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -130,11 +130,11 @@ public static async Task IsConnectionOpen(string connectionName) var result = await client.GetAsync("http://localhost:15672/api/connections"); if (!result.IsSuccessStatusCode) { - throw new XunitException(string.Format("HTTP GET failed: {0} {1}", result.StatusCode, - result.ReasonPhrase)); + throw new XunitException($"HTTP GET failed: {result.StatusCode} {result.ReasonPhrase}"); } - var obj = await JsonSerializer.DeserializeAsync(result.Content.ReadAsStream(), typeof(IEnumerable)); + var obj = await JsonSerializer.DeserializeAsync(await result.Content.ReadAsStreamAsync(), + typeof(IEnumerable)); if (obj == null) return false; var connections = obj as IEnumerable; isOpen = connections.Any(x => x.client_properties["connection_name"].Contains(connectionName)); @@ -193,6 +193,13 @@ public static async Task HttpKillConnections(string connectionName) return killed; } + public static int WaitUntilConnectionIsKilled(string connectionName) + { + Wait(); + WaitUntilAsync(async () => await HttpKillConnections(connectionName) == 1); + return 1; + } + private static HttpClient CreateHttpClient() { var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), };