Skip to content

Commit

Permalink
Merge pull request #40 from rabbitmq/misc
Browse files Browse the repository at this point in the history
Consumer handler messages async
  • Loading branch information
lukebakken authored Aug 13, 2024
2 parents 89b5478 + 4364927 commit c88a80b
Show file tree
Hide file tree
Showing 23 changed files with 391 additions and 286 deletions.
4 changes: 1 addition & 3 deletions .ci/ubuntu/rabbitmq.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ loopback_users = none
log.console = true
log.console.level = debug
log.file = /var/log/rabbitmq/rabbitmq.log
log.file.level = info
# log.connection.level = warning
# log.channel.level = warning
log.file.level = debug
log.exchange = false

listeners.tcp.default = 5672
Expand Down
4 changes: 1 addition & 3 deletions RabbitMQ.AMQP.Client/IConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
string? User { get; }
string? Password { get; }
string Scheme { get; }
string ConnectionName { get; }
string ContainerId { get; }
string Path { get; }
bool UseSsl { get; }
uint MaxFrameSize { get; }
SaslMechanism SaslMechanism { get; }
ITlsSettings? TlsSettings { get; }

IRecoveryConfiguration Recovery { get; }

}

/// <summary>
Expand Down
19 changes: 4 additions & 15 deletions RabbitMQ.AMQP.Client/IConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
using Amqp.Listener;

namespace RabbitMQ.AMQP.Client;

public class ConsumerException(string message) : Exception(message);
public delegate void MessageHandler(IContext context, IMessage message);
public delegate Task MessageHandler(IContext context, IMessage message);

public interface IConsumer : ILifeCycle
{
void Pause();

void Unpause();

long UnsettledMessageCount { get; }
}

public interface IMessageHandler
{
void Handle(Context context, IMessage message);
}

public interface IContext
{
void Accept();

void Discard();

void Requeue();
Task AcceptAsync();
Task DiscardAsync();
Task RequeueAsync();
}
41 changes: 20 additions & 21 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,10 @@ await _semaphoreOpen.WaitAsync()
var open = new Open
{
HostName = $"vhost:{_connectionSettings.VirtualHost}",
ContainerId = _connectionSettings.ContainerId,
Properties = new Fields()
{
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
[new Symbol("connection_name")] = _connectionSettings.ContainerId,
}
};

Expand All @@ -216,15 +217,15 @@ [new Symbol("connection_name")] = _connectionSettings.ConnectionName,
open.MaxFrameSize = _connectionSettings.MaxFrameSize;
}

void onOpened(Amqp.IConnection connection, Open open1)
void OnOpened(Amqp.IConnection connection, Open open1)
{
Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}");
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is open");
OnNewStatus(State.Open, null);
}

var cf = new ConnectionFactory();

if (_connectionSettings.UseSsl && _connectionSettings.TlsSettings is not null)
if (_connectionSettings is { UseSsl: true, TlsSettings: not null })
{
cf.SSL.Protocols = _connectionSettings.TlsSettings.Protocols;
cf.SSL.CheckCertificateRevocation = _connectionSettings.TlsSettings.CheckCertificateRevocation;
Expand Down Expand Up @@ -254,19 +255,19 @@ void onOpened(Amqp.IConnection connection, Open open1)

try
{
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: onOpened)
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: OnOpened)
.ConfigureAwait(false);
}
catch (Exception ex)
{
throw new ConnectionException(
$"Connection failed. Info: {ToString()}", ex);
$"{ToString()} connection failed.", ex);
}

if (_nativeConnection.IsClosed)
{
throw new ConnectionException(
$"Connection failed. Info: {ToString()}, error: {_nativeConnection.Error}");
$"{ToString()} connection failed., error: {_nativeConnection.Error}");
}

await _management.OpenAsync()
Expand All @@ -276,8 +277,8 @@ await _management.OpenAsync()
}
catch (AmqpException e)
{
Trace.WriteLine(TraceLevel.Error, $"Error trying to connect. Info: {ToString()}, error: {e}");
throw new ConnectionException($"Error trying to connect. Info: {ToString()}, error: {e}");
Trace.WriteLine(TraceLevel.Error, $"{ToString()} - Error trying to connect, error: {e}");
throw new ConnectionException($"{ToString()} - Error trying to connect., error: {e}");
}
finally
{
Expand Down Expand Up @@ -310,8 +311,8 @@ await _semaphoreClose.WaitAsync()
if (error != null)
{
// we assume here that the connection is closed unexpectedly, since the error is not null
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpectedly. " +
$"Info: {ToString()}");
Trace.WriteLine(TraceLevel.Warning, $"{ToString()} is closed unexpectedly. "
);
// we have to check if the recovery is active.
// The user may want to disable the recovery mechanism
Expand Down Expand Up @@ -349,9 +350,8 @@ await Task.Run(async () =>
int nextDelayMs = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();
Trace.WriteLine(TraceLevel.Information,
$"Trying Recovering connection in {nextDelayMs} milliseconds, " +
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. " +
$"Info: {ToString()})");
$"{ToString()} is trying Recovering connection in {nextDelayMs} milliseconds, " +
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. ");
await Task.Delay(TimeSpan.FromMilliseconds(nextDelayMs))
.ConfigureAwait(false);
Expand All @@ -364,18 +364,18 @@ await OpenConnectionAsync()
catch (Exception e)
{
Trace.WriteLine(TraceLevel.Warning,
$"Error trying to recover connection {e}. Info: {this}");
$"{ToString()} Error trying to recover connection {e}");
}
}
_connectionSettings.Recovery.GetBackOffDelayPolicy().Reset();
string connectionDescription = connected ? "recovered" : "not recovered";
Trace.WriteLine(TraceLevel.Information,
$"Connection {connectionDescription}. Info: {ToString()}");
$"{ToString()} is {connectionDescription}");
if (!connected)
{
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} connection is closed");
OnNewStatus(State.Closed,
new Error(ConnectionNotRecoveredCode,
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
Expand All @@ -388,7 +388,7 @@ await OpenConnectionAsync()
if (_connectionSettings.Recovery.IsTopologyActive())
{
Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
Trace.WriteLine(TraceLevel.Information, $"{ToString()} Recovering topology");
var visitor = new Visitor(_management);
await _recordingTopologyListener.Accept(visitor)
.ConfigureAwait(false);
Expand All @@ -403,15 +403,14 @@ await _recordingTopologyListener.Accept(visitor)
}
catch (Exception e)
{
Trace.WriteLine(TraceLevel.Error, $"Error trying to reconnect entities {e}. Info: {this}");
Trace.WriteLine(TraceLevel.Error, $"{ToString()} error trying to reconnect entities {e}");
}
}).ConfigureAwait(false);
return;
}
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
OnNewStatus(State.Closed, Utils.ConvertError(error));
}
finally
Expand Down
88 changes: 73 additions & 15 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,43 @@ public override async Task OpenAsync()
{
try
{
TaskCompletionSource attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<ReceiverLink> attachCompletedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);

Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, Id, _filters);

void onAttached(ILink argLink, Attach argAttach)
{
attachCompletedTcs.SetResult();
if (argLink is ReceiverLink link)
{
attachCompletedTcs.SetResult(link);
}
else
{
// TODO create "internal bug" exception type?
var ex = new InvalidOperationException(
"invalid link in onAttached, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
attachCompletedTcs.SetException(ex);
}
}

ReceiverLink? tmpReceiverLink = null;
Task receiverLinkTask = Task.Run(() =>
{
_receiverLink = new ReceiverLink(_connection._nativePubSubSessions.GetOrCreateSession(), Id, attach, onAttached);
tmpReceiverLink = new ReceiverLink(_connection._nativePubSubSessions.GetOrCreateSession(), Id, attach, onAttached);
});

// TODO configurable timeout
TimeSpan waitSpan = TimeSpan.FromSeconds(5);

await attachCompletedTcs.Task.WaitAsync(waitSpan)
_receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
.ConfigureAwait(false);

await receiverLinkTask.WaitAsync(waitSpan)
.ConfigureAwait(false);

System.Diagnostics.Debug.Assert(tmpReceiverLink != null);
System.Diagnostics.Debug.Assert(Object.ReferenceEquals(_receiverLink, tmpReceiverLink));

if (_receiverLink is null)
{
throw new ConsumerException($"{ToString()} Failed to create receiver link (null was returned)");
Expand All @@ -77,9 +91,10 @@ await receiverLinkTask.WaitAsync(waitSpan)
}
else
{
// TODO: Check the performance during the download messages
// The publisher is faster than the consumer
_receiverLink.Start(_initialCredits, OnReceiverLinkMessage);
_receiverLink.SetCredit(_initialCredits);

// TODO save / cancel task
_ = Task.Run(ProcessMessages);

await base.OpenAsync()
.ConfigureAwait(false);
Expand All @@ -91,11 +106,51 @@ await base.OpenAsync()
}
}

private void OnReceiverLinkMessage(IReceiverLink link, Message message)
private async Task ProcessMessages()
{
_unsettledMessageCounter.Increment();
IContext context = new DeliveryContext(link, message, _unsettledMessageCounter);
_messageHandler(context, new AmqpMessage(message));
try
{
if (_receiverLink is null)
{
// TODO is this a serious error?
return;
}

while (_receiverLink is { LinkState: LinkState.Attached })
{
TimeSpan timeout = TimeSpan.FromSeconds(60); // TODO configurable
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
if (nativeMessage is null)
{
// this is not a problem, it is just a timeout.
// the timeout is set to 60 seconds.
// For the moment I'd trace it at some point we can remove it
Trace.WriteLine(TraceLevel.Verbose,
$"{ToString()}: Timeout {timeout.Seconds} s.. waiting for message.");
continue;
}

_unsettledMessageCounter.Increment();

IContext context = new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter);
var amqpMessage = new AmqpMessage(nativeMessage);

// TODO catch exceptions thrown by handlers,
// then call exception handler?
await _messageHandler(context, amqpMessage).ConfigureAwait(false);
}
}
catch (Exception e)
{
if (State == State.Closing)
{
return;
}

Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to process messages, {e}");
}

Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed.");
}

private string Id { get; } = Guid.NewGuid().ToString();
Expand All @@ -111,7 +166,7 @@ public void Pause()
if ((int)PauseStatus.UNPAUSED == Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.PAUSING, (int)PauseStatus.UNPAUSED))
{
_receiverLink.SetCredit(credit: 0, autoRestore: false);
_receiverLink.SetCredit(credit: 0);

if ((int)PauseStatus.PAUSING != Interlocked.CompareExchange(ref Unsafe.As<PauseStatus, int>(ref _pauseStatus),
(int)PauseStatus.PAUSED, (int)PauseStatus.PAUSING))
Expand Down Expand Up @@ -163,8 +218,8 @@ public override async Task CloseAsync()

OnNewStatus(State.Closing, null);

// TODO timeout
await _receiverLink.CloseAsync()
// TODO global timeout for closing, other async actions?
await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
.ConfigureAwait(false);

_receiverLink = null;
Expand All @@ -176,6 +231,9 @@ await _receiverLink.CloseAsync()

public override string ToString()
{
return $"Consumer{{Address='{_address}', id={Id} ConnectionName='{_connection}', State='{State}'}}";
return $"Consumer{{Address='{_address}', " +
$"id={Id}, " +
$"Connection='{_connection}', " +
$"State='{State}'}}";
}
}
7 changes: 6 additions & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public IConsumerBuilder Queue(string queue)
}


private MessageHandler _handler = (message, context) => { };
private MessageHandler? _handler;

public IConsumerBuilder MessageHandler(MessageHandler handler)
{
Expand All @@ -38,6 +38,11 @@ public IConsumerBuilder.IStreamOptions Stream()

public async Task<IConsumer> BuildAsync(CancellationToken cancellationToken = default)
{
if (_handler is null)
{
throw new ConsumerException("Message handler is not set");
}

string address = new AddressBuilder().Queue(_queue).Address();

AmqpConsumer consumer = new(connection, address, _handler, _initialCredits, _filters);
Expand Down
10 changes: 7 additions & 3 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ await EnsureReceiverLinkAsync()

_managementSession.Closed += (sender, error) =>
{
if (State != State.Closed)
if (State != State.Closed && error != null)
{
Trace.WriteLine(TraceLevel.Warning, $"Management session closed " +
$"with error: {Utils.ConvertError(error)} " +
Expand All @@ -151,11 +151,15 @@ private async Task ProcessResponses()
continue;
}

using (Message msg = await _receiverLink.ReceiveAsync().ConfigureAwait(false))
TimeSpan timeout = TimeSpan.FromSeconds(59);
using (Message msg = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false))
{
if (msg == null)
{
Trace.WriteLine(TraceLevel.Warning, "Received null message");
// this is not a problem, it is just a timeout.
// the timeout is set to 60 seconds.
// For the moment I'd trace it at some point we can remove it
Trace.WriteLine(TraceLevel.Verbose, $"Management:Timeout {timeout.Seconds} s.. waiting for message.");
continue;
}

Expand Down
Loading

0 comments on commit c88a80b

Please sign in to comment.