diff --git a/src/Seq.Input.RabbitMQ/RabbitMQListener.cs b/src/PipeOsorio.Seq.Input.RabbitMQ/RabbitMQListener.cs similarity index 61% rename from src/Seq.Input.RabbitMQ/RabbitMQListener.cs rename to src/PipeOsorio.Seq.Input.RabbitMQ/RabbitMQListener.cs index 30ed239..e775d34 100644 --- a/src/Seq.Input.RabbitMQ/RabbitMQListener.cs +++ b/src/PipeOsorio.Seq.Input.RabbitMQ/RabbitMQListener.cs @@ -1,5 +1,4 @@ using System; -using System.Net.Security; using RabbitMQ.Client; using RabbitMQ.Client.Events; @@ -14,13 +13,16 @@ public RabbitMQListener( Action> receive, string rabbitMQHost, string rabbitMQVHost, - int rabbitMQPort, - string rabbitMQUser, + int rabbitMQPort, + string rabbitMQUser, string rabbitMQPassword, - string rabbitMQQueue, + string rabbitMQQueue, + string rabbitMQExchangeName, + string rabbitMQExchangeType, + string rabbitMQRouteKey, bool isSsl, - bool isQueueDurable, - bool isQueueAutoDelete, + bool isQueueDurable, + bool isQueueAutoDelete, bool isQueueExclusive, bool isReceiveAutoAck) { @@ -36,17 +38,32 @@ public RabbitMQListener( Enabled = isSsl } }; - + _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); + if (rabbitMQExchangeName != string.Empty) + _channel.ExchangeDeclare( + exchange: rabbitMQExchangeName, + type: rabbitMQExchangeType, + durable: isQueueDurable, + autoDelete: isQueueAutoDelete + ); + _channel.QueueDeclare( - rabbitMQQueue, - durable: isQueueDurable, + rabbitMQQueue, + durable: isQueueDurable, exclusive: isQueueExclusive, - autoDelete: isQueueAutoDelete, + autoDelete: isQueueAutoDelete, arguments: null); + if (rabbitMQRouteKey != string.Empty) + _channel.QueueBind( + queue: rabbitMQQueue, + exchange: rabbitMQExchangeName, + routingKey: rabbitMQRouteKey + ); + var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => receive(ea.Body); _channel.BasicConsume(rabbitMQQueue, autoAck: isReceiveAutoAck, consumer: consumer); @@ -63,4 +80,4 @@ public void Dispose() _connection?.Close(); } } -} +} \ No newline at end of file diff --git a/src/Seq.Input.RabbitMQ/RabbitMQInput.cs b/src/Seq.Input.RabbitMQ/RabbitMQInput.cs index edc6555..ee98404 100644 --- a/src/Seq.Input.RabbitMQ/RabbitMQInput.cs +++ b/src/Seq.Input.RabbitMQ/RabbitMQInput.cs @@ -1,6 +1,7 @@ using System; using System.IO; using System.Text; +using RabbitMQ.Client; using Seq.Apps; namespace Seq.Input.RabbitMQ @@ -50,6 +51,27 @@ public class RabbitMQInput : SeqApp, IPublishJson, IDisposable HelpText = "The RabbitMQ queue name to receive events from. The default is `Logs`.")] public string RabbitMQQueue { get; set; } = "logs"; + [SeqAppSetting( + DisplayName = "RabbitMQ exchange name", + IsOptional = true, + HelpText = "The name of the RabbitMQ exchange from which to pull events. This is the exchange " + + "where the messages are published.")] + public string rabbitMQExchangeName { get; set; } = ""; + + [SeqAppSetting( + DisplayName = "RabbitMQ exchange type", + IsOptional = true, + HelpText = "The type of the RabbitMQ exchange (e.g., direct, topic, fanout, or headers). " + + "Determines how messages are routed to the queue.")] + public string rabbitMQExchangeType { get; set; } = ExchangeType.Direct; + + [SeqAppSetting( + DisplayName = "RabbitMQ Route key", + IsOptional = true, + HelpText = "The routing key used for binding the queue to the exchange. " + + "This key is used to route messages from the exchange to the queue.")] + public string rabbitMQRouteKey { get; set; } = ""; + [SeqAppSetting( DisplayName = "Require SSL", IsOptional = true, @@ -83,6 +105,7 @@ public class RabbitMQInput : SeqApp, IPublishJson, IDisposable public void Start(TextWriter inputWriter) { var sync = new object(); + void Receive(ReadOnlyMemory body) { try @@ -107,6 +130,9 @@ void Receive(ReadOnlyMemory body) RabbitMQUser, RabbitMQPassword, RabbitMQQueue, + rabbitMQExchangeName, + rabbitMQExchangeType, + rabbitMQRouteKey, IsSsl, IsQueueDurable, IsQueueAutoDelete, @@ -124,4 +150,4 @@ public void Dispose() _listener?.Dispose(); } } -} +} \ No newline at end of file