-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathRabbitMQInput.cs
153 lines (133 loc) · 5.52 KB
/
RabbitMQInput.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
using System;
using System.IO;
using System.Text;
using RabbitMQ.Client;
using Seq.Apps;
namespace Seq.Input.RabbitMQ
{
[SeqApp("RabbitMQ Input",
Description = "Pulls JSON-formatted events from a RabbitMQ queue. For details of the " +
"supported JSON schema, see " +
"https://github.com/serilog/serilog-formatting-compact/#format-details.")]
public class RabbitMQInput : SeqApp, IPublishJson, IDisposable
{
RabbitMQListener _listener;
[SeqAppSetting(
DisplayName = "RabbitMQ host",
IsOptional = true,
HelpText = "The hostname on which RabbitMQ is running. The default is `localhost`.")]
public string RabbitMQHost { get; set; } = "localhost";
[SeqAppSetting(
DisplayName = "RabbitMQ Virtual Host",
IsOptional = true,
HelpText = "The virtual host in RabbitMQ. The default is `/`.")]
public string RabbitMQVHost { get; set; } = "/";
[SeqAppSetting(
DisplayName = "RabbitMQ port",
IsOptional = true,
HelpText = "The port on which the RabbitMQ server is listening. The default is `5672`.")]
public int RabbitMQPort { get; set; } = 5672;
[SeqAppSetting(
DisplayName = "RabbitMQ user",
IsOptional = true,
HelpText = "The username provided when connecting to RabbitMQ. The default is `guest`.")]
public string RabbitMQUser { get; set; } = "guest";
[SeqAppSetting(
DisplayName = "RabbitMQ password",
IsOptional = true,
InputType = SettingInputType.Password,
HelpText = "The password provided when connecting to RabbitMQ. The default is `guest`.")]
public string RabbitMQPassword { get; set; } = "guest";
[SeqAppSetting(
DisplayName = "RabbitMQ queue",
IsOptional = true,
HelpText = "The RabbitMQ queue name to receive events from. The default is `Logs`.")]
public string RabbitMQQueue { get; set; } = "logs";
[SeqAppSetting(
DisplayName = "RabbitMQ exchange name",
IsOptional = true,
HelpText = "The name of the RabbitMQ exchange from which to pull events. This is the exchange " +
"where the messages are published.")]
public string rabbitMQExchangeName { get; set; } = "";
[SeqAppSetting(
DisplayName = "RabbitMQ exchange type",
IsOptional = true,
HelpText = "The type of the RabbitMQ exchange (e.g., direct, topic, fanout, or headers). " +
"Determines how messages are routed to the queue.")]
public string rabbitMQExchangeType { get; set; } = ExchangeType.Direct;
[SeqAppSetting(
DisplayName = "RabbitMQ Route key",
IsOptional = true,
HelpText = "The routing key used for binding the queue to the exchange. " +
"This key is used to route messages from the exchange to the queue.")]
public string rabbitMQRouteKey { get; set; } = "";
[SeqAppSetting(
DisplayName = "Require SSL",
IsOptional = true,
HelpText = "Whether or not the connection is with SSL. The default is false.")]
public bool IsSsl { get; set; }
[SeqAppSetting(
DisplayName = "Durable",
IsOptional = true,
HelpText = "Whether or not the queue is durable. The default is false.")]
public bool IsQueueDurable { get; set; }
[SeqAppSetting(
DisplayName = "Exclusive",
IsOptional = true,
HelpText = "Whether or not the queue is exclusive. The default is false.")]
public bool IsQueueExclusive { get; set; }
[SeqAppSetting(
DisplayName = "Auto-delete",
IsOptional = true,
HelpText = "Whether or not the queue subscription is durable. The default is false.")]
public bool IsQueueAutoDelete { get; set; }
[SeqAppSetting(
DisplayName = "Auto-ACK",
IsOptional = true,
HelpText = "Whether or not messages should be auto-acknowledged. The default is true.")]
public bool IsReceiveAutoAck { get; set; } = true;
public void Start(TextWriter inputWriter)
{
var sync = new object();
void Receive(ReadOnlyMemory<byte> body)
{
try
{
lock (sync)
{
var clef = Encoding.UTF8.GetString(body.ToArray());
inputWriter.WriteLine(clef);
}
}
catch (Exception ex)
{
Log.Error(ex, "A received message could not be decoded");
}
}
_listener = new RabbitMQListener(
Receive,
RabbitMQHost,
RabbitMQVHost,
RabbitMQPort,
RabbitMQUser,
RabbitMQPassword,
RabbitMQQueue,
rabbitMQExchangeName,
rabbitMQExchangeType,
rabbitMQRouteKey,
IsSsl,
IsQueueDurable,
IsQueueAutoDelete,
IsQueueExclusive,
IsReceiveAutoAck);
}
public void Stop()
{
_listener.Close();
}
public void Dispose()
{
_listener?.Dispose();
}
}
}