Skip to content

Commit 0aa2ae1

Browse files
committed
CU-868a3neq0 trying to fix eventing issue.
1 parent e87254a commit 0aa2ae1

24 files changed

+480
-290
lines changed

Core/Resgrid.Model/Providers/IRabbitInboundEventProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace Resgrid.Model.Providers
66
{
77
public interface IRabbitInboundEventProvider
88
{
9-
Task Start();
9+
Task Start(string clientName, string queueName);
1010
void RegisterForEvents(Func<int, string, Task> personnelStatusChanged,
1111
Func<int, string, Task> unitStatusChanged,
1212
Func<int, string, Task> callStatusChanged,

Providers/Resgrid.Providers.Bus.Rabbit/RabbitConnection.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,26 @@ internal class RabbitConnection
99
{
1010
private static IConnection _connection { get; set; }
1111
private static ConnectionFactory _factory { get; set; }
12-
private static object LOCK = new object();
12+
private readonly static object LOCK = new object();
1313

1414

15-
public static bool VerifyAndCreateClients()
15+
public static bool VerifyAndCreateClients(string clientName)
1616
{
1717
if (_connection != null && !_connection.IsOpen)
1818
{
19-
_connection?.Dispose();
20-
19+
_connection.Dispose();
2120
_connection = null;
2221
_factory = null;
2322
}
24-
23+
2524
if (_connection == null)
2625
{
2726
lock (LOCK)
2827
{
2928
try
3029
{
3130
_factory = new ConnectionFactory() { HostName = ServiceBusConfig.RabbitHostname, UserName = ServiceBusConfig.RabbitUsername, Password = ServiceBusConfig.RabbbitPassword };
32-
_connection = _factory.CreateConnection();
31+
_connection = _factory.CreateConnection(clientName);
3332
}
3433
catch (Exception ex)
3534
{
@@ -40,7 +39,7 @@ public static bool VerifyAndCreateClients()
4039
try
4140
{
4241
_factory = new ConnectionFactory() { HostName = ServiceBusConfig.RabbitHostname2, UserName = ServiceBusConfig.RabbitUsername, Password = ServiceBusConfig.RabbbitPassword };
43-
_connection = _factory.CreateConnection();
42+
_connection = _factory.CreateConnection(clientName);
4443
}
4544
catch (Exception ex2)
4645
{
@@ -51,7 +50,7 @@ public static bool VerifyAndCreateClients()
5150
try
5251
{
5352
_factory = new ConnectionFactory() { HostName = ServiceBusConfig.RabbitHostname3, UserName = ServiceBusConfig.RabbitUsername, Password = ServiceBusConfig.RabbbitPassword };
54-
_connection = _factory.CreateConnection();
53+
_connection = _factory.CreateConnection(clientName);
5554
}
5655
catch (Exception ex3)
5756
{
@@ -155,19 +154,18 @@ public static bool VerifyAndCreateClients()
155154
return false;
156155
}
157156

158-
public static IConnection CreateConnection()
157+
public static IConnection CreateConnection(string clientName)
159158
{
160159
if (_connection == null)
161-
VerifyAndCreateClients();
160+
VerifyAndCreateClients(clientName);
162161

163162
if (!_connection.IsOpen)
164163
{
165-
_connection?.Dispose();
166-
164+
_connection.Dispose();
167165
_connection = null;
168166
_factory = null;
169167

170-
VerifyAndCreateClients();
168+
VerifyAndCreateClients(clientName);
171169
}
172170

173171
return _connection;
Lines changed: 63 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Text;
33
using System.Threading;
4+
using System.Threading.Channels;
45
using System.Threading.Tasks;
56
using Newtonsoft.Json;
67
using RabbitMQ.Client;
@@ -15,7 +16,6 @@ namespace Resgrid.Providers.Bus.Rabbit
1516
{
1617
public class RabbitInboundEventProvider : IRabbitInboundEventProvider
1718
{
18-
//private ConnectionFactory _factory;
1919
private IConnection _connection;
2020
private IModel _channel;
2121

@@ -28,25 +28,25 @@ public class RabbitInboundEventProvider : IRabbitInboundEventProvider
2828
public Func<int, PersonnelLocationUpdatedEvent, Task> PersonnelLocationUpdated;
2929
public Func<int, UnitLocationUpdatedEvent, Task> UnitLocationUpdated;
3030

31-
public async Task Start()
31+
public async Task Start(string clientName, string queueName)
3232
{
33-
VerifyAndCreateClients();
34-
await StartMonitoring();
33+
VerifyAndCreateClients(clientName);
34+
await StartMonitoring(queueName);
3535
}
3636

37-
private void VerifyAndCreateClients()
37+
private void VerifyAndCreateClients(string clientName)
3838
{
3939
try
4040
{
41-
_connection = RabbitConnection.CreateConnection();
41+
_connection = RabbitConnection.CreateConnection(clientName);
4242

4343
if (_connection != null)
4444
{
4545
_channel = _connection.CreateModel();
4646

4747
if (_channel != null)
4848
{
49-
_channel.ExchangeDeclare(SetQueueNameForEnv(Topics.EventingTopic), "fanout");
49+
_channel.ExchangeDeclare(RabbitConnection.SetQueueNameForEnv(Topics.EventingTopic), "fanout");
5050
}
5151
}
5252
}
@@ -56,69 +56,69 @@ private void VerifyAndCreateClients()
5656
}
5757
}
5858

59-
private async Task StartMonitoring()
59+
private async Task StartMonitoring(string queueName)
6060
{
61-
if (SystemBehaviorConfig.ServiceBusType == ServiceBusTypes.Rabbit)
62-
{
63-
var queueName = _channel.QueueDeclare().QueueName;
61+
//var queueName = _channel.QueueDeclare().QueueName;
6462

65-
_channel.QueueBind(queue: queueName,
66-
exchange: SetQueueNameForEnv(Topics.EventingTopic),
67-
routingKey: "");
63+
var queue = _channel.QueueDeclare(RabbitConnection.SetQueueNameForEnv(queueName), durable: true,
64+
autoDelete: false, exclusive: false);
6865

69-
var consumer = new EventingBasicConsumer(_channel);
70-
consumer.Received += async (model, ea) =>
71-
{
72-
var body = ea.Body.ToArray();
73-
var message = Encoding.UTF8.GetString(body);
66+
_channel.QueueBind(queue: queue.QueueName,
67+
exchange: RabbitConnection.SetQueueNameForEnv(Topics.EventingTopic),
68+
routingKey: "");
69+
70+
var consumer = new EventingBasicConsumer(_channel);
71+
consumer.Received += async (model, ea) =>
72+
{
73+
var body = ea.Body.ToArray();
74+
var message = Encoding.UTF8.GetString(body);
7475

75-
var eventingMessage = JsonConvert.DeserializeObject<EventingMessage>(message);
76+
var eventingMessage = JsonConvert.DeserializeObject<EventingMessage>(message);
7677

77-
if (eventingMessage != null)
78+
if (eventingMessage != null)
79+
{
80+
switch ((EventingTypes)eventingMessage.Type)
7881
{
79-
switch ((EventingTypes)eventingMessage.Type)
80-
{
81-
case EventingTypes.PersonnelStatusUpdated:
82-
if (ProcessPersonnelStatusChanged != null)
83-
await ProcessPersonnelStatusChanged(eventingMessage.DepartmentId, eventingMessage.ItemId);
84-
break;
85-
case EventingTypes.UnitStatusUpdated:
86-
if (ProcessUnitStatusChanged != null)
87-
await ProcessUnitStatusChanged.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
88-
break;
89-
case EventingTypes.CallsUpdated:
90-
if (ProcessCallStatusChanged != null)
91-
await ProcessCallStatusChanged.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
92-
break;
93-
case EventingTypes.CallAdded:
94-
if (ProcessCallStatusChanged != null)
95-
await ProcessCallStatusChanged.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
96-
break;
97-
case EventingTypes.CallClosed:
98-
if (ProcessCallStatusChanged != null)
99-
await ProcessCallStatusChanged.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
100-
break;
101-
case EventingTypes.PersonnelStaffingUpdated:
102-
if (ProcessPersonnelStaffingChanged != null)
103-
await ProcessPersonnelStaffingChanged.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
104-
break;
105-
case EventingTypes.PersonnelLocationUpdated:
106-
if (PersonnelLocationUpdated != null)
107-
await PersonnelLocationUpdated.Invoke(eventingMessage.DepartmentId, JsonConvert.DeserializeObject<PersonnelLocationUpdatedEvent>(eventingMessage.Payload));
108-
break;
109-
case EventingTypes.UnitLocationUpdated:
110-
if (UnitLocationUpdated != null)
111-
await UnitLocationUpdated.Invoke(eventingMessage.DepartmentId, JsonConvert.DeserializeObject<UnitLocationUpdatedEvent>(eventingMessage.Payload));
112-
break;
113-
default:
114-
throw new ArgumentOutOfRangeException();
115-
}
82+
case EventingTypes.PersonnelStatusUpdated:
83+
if (ProcessPersonnelStatusChanged != null)
84+
await ProcessPersonnelStatusChanged(eventingMessage.DepartmentId, eventingMessage.ItemId);
85+
break;
86+
case EventingTypes.UnitStatusUpdated:
87+
if (ProcessUnitStatusChanged != null)
88+
await ProcessUnitStatusChanged.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
89+
break;
90+
case EventingTypes.CallsUpdated:
91+
if (ProcessCallStatusChanged != null)
92+
await ProcessCallStatusChanged.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
93+
break;
94+
case EventingTypes.CallAdded:
95+
if (ProcessCallAdded != null)
96+
await ProcessCallAdded.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
97+
break;
98+
case EventingTypes.CallClosed:
99+
if (ProcessCallClosed != null)
100+
await ProcessCallClosed.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
101+
break;
102+
case EventingTypes.PersonnelStaffingUpdated:
103+
if (ProcessPersonnelStaffingChanged != null)
104+
await ProcessPersonnelStaffingChanged.Invoke(eventingMessage.DepartmentId, eventingMessage.ItemId);
105+
break;
106+
case EventingTypes.PersonnelLocationUpdated:
107+
if (PersonnelLocationUpdated != null)
108+
await PersonnelLocationUpdated.Invoke(eventingMessage.DepartmentId, JsonConvert.DeserializeObject<PersonnelLocationUpdatedEvent>(eventingMessage.Payload));
109+
break;
110+
case EventingTypes.UnitLocationUpdated:
111+
if (UnitLocationUpdated != null)
112+
await UnitLocationUpdated.Invoke(eventingMessage.DepartmentId, JsonConvert.DeserializeObject<UnitLocationUpdatedEvent>(eventingMessage.Payload));
113+
break;
114+
default:
115+
throw new ArgumentOutOfRangeException();
116116
}
117-
};
118-
_channel.BasicConsume(queue: queueName,
119-
autoAck: true,
120-
consumer: consumer);
121-
}
117+
}
118+
};
119+
_channel.BasicConsume(queue: queue.QueueName,
120+
autoAck: true,
121+
consumer: consumer);
122122
}
123123

124124
public bool IsConnected()
@@ -147,17 +147,5 @@ public void RegisterForEvents(Func<int, string, Task> personnelStatusChanged,
147147
PersonnelLocationUpdated = personnelLocationUpdated;
148148
UnitLocationUpdated = unitLocationUpdated;
149149
}
150-
151-
private static string SetQueueNameForEnv(string cacheKey)
152-
{
153-
if (Config.SystemBehaviorConfig.Environment == SystemEnvironment.Dev)
154-
return $"DEV{cacheKey}";
155-
else if (Config.SystemBehaviorConfig.Environment == SystemEnvironment.QA)
156-
return $"QA{cacheKey}";
157-
else if (Config.SystemBehaviorConfig.Environment == SystemEnvironment.Staging)
158-
return $"ST{cacheKey}";
159-
160-
return cacheKey;
161-
}
162150
}
163151
}

Providers/Resgrid.Providers.Bus.Rabbit/RabbitInboundQueueProvider.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ namespace Resgrid.Providers.Bus.Rabbit
1414
{
1515
public class RabbitInboundQueueProvider
1616
{
17+
private string _clientName;
1718
private IModel _channel;
1819
public Func<CallQueueItem, Task> CallQueueReceived;
1920
public Func<MessageQueueItem, Task> MessageQueueReceived;
@@ -32,9 +33,10 @@ public RabbitInboundQueueProvider()
3233
RabbitOutboundQueueProvider provider = new RabbitOutboundQueueProvider();
3334
}
3435

35-
public async Task Start()
36+
public async Task Start(string clientName)
3637
{
37-
var connection = RabbitConnection.CreateConnection();
38+
_clientName = clientName;
39+
var connection = RabbitConnection.CreateConnection(clientName);
3840

3941
if (connection != null)
4042
{
@@ -584,7 +586,7 @@ private bool RetryQueueItem(BasicDeliverEventArgs ea, Exception mex)
584586
//var factory = new ConnectionFactory() { HostName = ServiceBusConfig.RabbitHostname, UserName = ServiceBusConfig.RabbitUsername, Password = ServiceBusConfig.RabbbitPassword };
585587
//using (var connection = RabbitConnection.CreateConnection())
586588
//{
587-
var connection = RabbitConnection.CreateConnection();
589+
var connection = RabbitConnection.CreateConnection(_clientName);
588590
if (connection != null)
589591
{
590592
using (var channel = connection.CreateModel())

Providers/Resgrid.Providers.Bus.Rabbit/RabbitOutboundQueueProvider.cs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ namespace Resgrid.Providers.Bus.Rabbit
1313
{
1414
public class RabbitOutboundQueueProvider : IRabbitOutboundQueueProvider
1515
{
16+
private readonly string _clientName = "Resgrid-Outbound";
17+
1618
public bool EnqueueCall(CallQueueItem callQueue)
1719
{
1820
string serializedObject = ObjectSerialization.Serialize(callQueue);
@@ -97,11 +99,6 @@ public bool EnqueueSecurityRefreshEvent(SecurityRefreshEvent securityRefreshEven
9799
return SendMessage(ServiceBusConfig.SecurityRefreshQueueName, serializedObject, false, "300000");
98100
}
99101

100-
public bool VerifyAndCreateClients()
101-
{
102-
return RabbitConnection.VerifyAndCreateClients();
103-
}
104-
105102
private bool SendMessage(string queueName, string message, bool durable = true, string expiration = "36000000")
106103
{
107104
if (String.IsNullOrWhiteSpace(queueName))
@@ -110,15 +107,9 @@ private bool SendMessage(string queueName, string message, bool durable = true,
110107
if (String.IsNullOrWhiteSpace(message))
111108
throw new ArgumentNullException("message");
112109

113-
//if (SystemBehaviorConfig.ServiceBusType == ServiceBusTypes.Rabbit)
114-
//{
115110
try
116111
{
117-
// TODO: Maybe? https://github.com/EasyNetQ/EasyNetQ -SJ
118-
//var factory = new ConnectionFactory() { HostName = ServiceBusConfig.RabbitHostname, UserName = ServiceBusConfig.RabbitUsername, Password = ServiceBusConfig.RabbbitPassword };
119-
//using (var connection = RabbitConnection.CreateConnection())
120-
//{
121-
var connection = RabbitConnection.CreateConnection();
112+
var connection = RabbitConnection.CreateConnection(_clientName);
122113
if (connection != null)
123114
{
124115
using (var channel = connection.CreateModel())
@@ -157,16 +148,17 @@ private bool SendMessage(string queueName, string message, bool durable = true,
157148
}
158149

159150
return false;
160-
//}
161151
}
162152
catch (Exception ex)
163153
{
164154
Logging.LogException(ex);
165155
return false;
166156
}
167-
//}
157+
}
168158

169-
//return false;
159+
bool IRabbitOutboundQueueProvider.VerifyAndCreateClients()
160+
{
161+
return RabbitConnection.VerifyAndCreateClients(_clientName);
170162
}
171163
}
172164
}

0 commit comments

Comments
 (0)