Skip to content

Added "exchange" and "routeKey" connection rabbitMQ #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Net.Security;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

Expand All @@ -14,13 +13,16 @@ public RabbitMQListener(
Action<ReadOnlyMemory<byte>> receive,
string rabbitMQHost,
string rabbitMQVHost,
int rabbitMQPort,
string rabbitMQUser,
int rabbitMQPort,
string rabbitMQUser,
string rabbitMQPassword,
string rabbitMQQueue,
string rabbitMQQueue,
string rabbitMQExchangeName,
string rabbitMQExchangeType,
string rabbitMQRouteKey,
bool isSsl,
bool isQueueDurable,
bool isQueueAutoDelete,
bool isQueueDurable,
bool isQueueAutoDelete,
bool isQueueExclusive,
bool isReceiveAutoAck)
{
Expand All @@ -36,17 +38,32 @@ public RabbitMQListener(
Enabled = isSsl
}
};

_connection = factory.CreateConnection();
_channel = _connection.CreateModel();

if (rabbitMQExchangeName != string.Empty)
_channel.ExchangeDeclare(
exchange: rabbitMQExchangeName,
type: rabbitMQExchangeType,
durable: isQueueDurable,
autoDelete: isQueueAutoDelete
);

_channel.QueueDeclare(
rabbitMQQueue,
durable: isQueueDurable,
rabbitMQQueue,
durable: isQueueDurable,
exclusive: isQueueExclusive,
autoDelete: isQueueAutoDelete,
autoDelete: isQueueAutoDelete,
arguments: null);

if (rabbitMQRouteKey != string.Empty)
_channel.QueueBind(
queue: rabbitMQQueue,
exchange: rabbitMQExchangeName,
routingKey: rabbitMQRouteKey
);

var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) => receive(ea.Body);
_channel.BasicConsume(rabbitMQQueue, autoAck: isReceiveAutoAck, consumer: consumer);
Expand All @@ -63,4 +80,4 @@ public void Dispose()
_connection?.Close();
}
}
}
}
28 changes: 27 additions & 1 deletion src/Seq.Input.RabbitMQ/RabbitMQInput.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using Seq.Apps;

namespace Seq.Input.RabbitMQ
Expand Down Expand Up @@ -50,6 +51,27 @@ public class RabbitMQInput : SeqApp, IPublishJson, IDisposable
HelpText = "The RabbitMQ queue name to receive events from. The default is `Logs`.")]
public string RabbitMQQueue { get; set; } = "logs";

[SeqAppSetting(
DisplayName = "RabbitMQ exchange name",
IsOptional = true,
HelpText = "The name of the RabbitMQ exchange from which to pull events. This is the exchange " +
"where the messages are published.")]
public string rabbitMQExchangeName { get; set; } = "";

[SeqAppSetting(
DisplayName = "RabbitMQ exchange type",
IsOptional = true,
HelpText = "The type of the RabbitMQ exchange (e.g., direct, topic, fanout, or headers). " +
"Determines how messages are routed to the queue.")]
public string rabbitMQExchangeType { get; set; } = ExchangeType.Direct;

[SeqAppSetting(
DisplayName = "RabbitMQ Route key",
IsOptional = true,
HelpText = "The routing key used for binding the queue to the exchange. " +
"This key is used to route messages from the exchange to the queue.")]
public string rabbitMQRouteKey { get; set; } = "";

[SeqAppSetting(
DisplayName = "Require SSL",
IsOptional = true,
Expand Down Expand Up @@ -83,6 +105,7 @@ public class RabbitMQInput : SeqApp, IPublishJson, IDisposable
public void Start(TextWriter inputWriter)
{
var sync = new object();

void Receive(ReadOnlyMemory<byte> body)
{
try
Expand All @@ -107,6 +130,9 @@ void Receive(ReadOnlyMemory<byte> body)
RabbitMQUser,
RabbitMQPassword,
RabbitMQQueue,
rabbitMQExchangeName,
rabbitMQExchangeType,
rabbitMQRouteKey,
IsSsl,
IsQueueDurable,
IsQueueAutoDelete,
Expand All @@ -124,4 +150,4 @@ public void Dispose()
_listener?.Dispose();
}
}
}
}