From e6d376078034a530328905cac697288505ecd1c7 Mon Sep 17 00:00:00 2001 From: Luca Date: Wed, 6 Nov 2024 03:19:30 +0100 Subject: [PATCH] Send own pubsub message for online status updates instead of using json set --- API/Realtime/RedisSubscriberService.cs | 2 +- .../Services/RedisPubSub/IRedisPubService.cs | 7 +++++++ Common/Services/RedisPubSub/RedisChannels.cs | 1 + .../Services/RedisPubSub/RedisPubService.cs | 10 +++++++++ .../Controllers/DeviceControllerBase.cs | 21 +++++++++++++++++-- .../Controllers/DeviceV1Controller.cs | 6 ++++-- .../Controllers/DeviceV2Controller.cs | 6 ++++-- 7 files changed, 46 insertions(+), 7 deletions(-) diff --git a/API/Realtime/RedisSubscriberService.cs b/API/Realtime/RedisSubscriberService.cs index 8e4a465f..ada6c663 100644 --- a/API/Realtime/RedisSubscriberService.cs +++ b/API/Realtime/RedisSubscriberService.cs @@ -48,7 +48,7 @@ public RedisSubscriberService( public async Task StartAsync(CancellationToken cancellationToken) { await _subscriber.SubscribeAsync(RedisChannels.KeyEventExpired, (_, message) => { LucTask.Run(() => RunLogic(message, false)); }); - await _subscriber.SubscribeAsync(RedisChannels.KeyEventJsonSet, (_, message) => { LucTask.Run(() => RunLogic(message, true)); }); + await _subscriber.SubscribeAsync(RedisChannels.DeviceOnlineStatus, (_, message) => { LucTask.Run(() => RunLogic(message, true)); }); await _subscriber.SubscribeAsync(RedisChannels.KeyEventDel, (_, message) => { LucTask.Run(() => RunLogic(message, false)); }); } diff --git a/Common/Services/RedisPubSub/IRedisPubService.cs b/Common/Services/RedisPubSub/IRedisPubService.cs index 0badf388..96738908 100644 --- a/Common/Services/RedisPubSub/IRedisPubService.cs +++ b/Common/Services/RedisPubSub/IRedisPubService.cs @@ -5,6 +5,13 @@ namespace OpenShock.Common.Services.RedisPubSub; public interface IRedisPubService { + /// + /// Used when a device comes online or changes its connection details like, gateway, firmware version, etc. + /// + /// + /// + public Task SendDeviceOnlineStatus(Guid deviceId); + /// /// General shocker control /// diff --git a/Common/Services/RedisPubSub/RedisChannels.cs b/Common/Services/RedisPubSub/RedisChannels.cs index 522a03a0..4c2bc243 100644 --- a/Common/Services/RedisPubSub/RedisChannels.cs +++ b/Common/Services/RedisPubSub/RedisChannels.cs @@ -11,6 +11,7 @@ public static class RedisChannels public static readonly RedisChannel DeviceControl = new("msg-device-control", RedisChannel.PatternMode.Literal); public static readonly RedisChannel DeviceCaptive = new("msg-device-control-captive", RedisChannel.PatternMode.Literal); public static readonly RedisChannel DeviceUpdate = new("msg-device-update", RedisChannel.PatternMode.Literal); + public static readonly RedisChannel DeviceOnlineStatus = new("msg-device-online-status", RedisChannel.PatternMode.Literal); // OTA public static readonly RedisChannel DeviceOtaInstall = new("msg-device-ota-install", RedisChannel.PatternMode.Literal); diff --git a/Common/Services/RedisPubSub/RedisPubService.cs b/Common/Services/RedisPubSub/RedisPubService.cs index 0e65bd92..f99f05fa 100644 --- a/Common/Services/RedisPubSub/RedisPubService.cs +++ b/Common/Services/RedisPubSub/RedisPubService.cs @@ -18,6 +18,16 @@ public RedisPubService(IConnectionMultiplexer connectionMultiplexer) _subscriber = connectionMultiplexer.GetSubscriber(); } + public Task SendDeviceOnlineStatus(Guid deviceId) + { + var redisMessage = new DeviceUpdatedMessage + { + Id = deviceId + }; + + return _subscriber.PublishAsync(RedisChannels.DeviceOnlineStatus, JsonSerializer.Serialize(redisMessage)); + } + /// public Task SendDeviceControl(Guid sender, IDictionary> controlMessages) { diff --git a/LiveControlGateway/Controllers/DeviceControllerBase.cs b/LiveControlGateway/Controllers/DeviceControllerBase.cs index 9807ec8e..c2e6f063 100644 --- a/LiveControlGateway/Controllers/DeviceControllerBase.cs +++ b/LiveControlGateway/Controllers/DeviceControllerBase.cs @@ -14,6 +14,7 @@ using OpenShock.Common.OpenShockDb; using OpenShock.Common.Problems; using OpenShock.Common.Redis; +using OpenShock.Common.Services.RedisPubSub; using OpenShock.Common.Utils; using OpenShock.LiveControlGateway.LifetimeManager; using OpenShock.LiveControlGateway.Websocket; @@ -44,6 +45,7 @@ public abstract class DeviceControllerBase : FlatbuffersWebsocketBase private readonly IRedisConnectionProvider _redisConnectionProvider; private readonly IDbContextFactory _dbContextFactory; private readonly LCGConfig _lcgConfig; + private readonly IRedisPubService _redisPubService; private readonly Timer _keepAliveTimeoutTimer = new(Duration.DeviceKeepAliveInitialTimeout); private DateTimeOffset _connected = DateTimeOffset.UtcNow; @@ -79,14 +81,15 @@ protected DeviceControllerBase( ISerializer outgoingSerializer, IRedisConnectionProvider redisConnectionProvider, IDbContextFactory dbContextFactory, - IServiceProvider serviceProvider, LCGConfig lcgConfig - + IServiceProvider serviceProvider, LCGConfig lcgConfig, + IRedisPubService redisPubService ) : base(logger, lifetime, incomingSerializer, outgoingSerializer) { _redisConnectionProvider = redisConnectionProvider; _dbContextFactory = dbContextFactory; ServiceProvider = serviceProvider; _lcgConfig = lcgConfig; + _redisPubService = redisPubService; _keepAliveTimeoutTimer.Elapsed += async (sender, args) => { Logger.LogInformation("Keep alive timeout reached, closing websocket connection"); @@ -161,11 +164,18 @@ await deviceOnline.InsertAsync(new DeviceOnline ConnectedAt = _connected, UserAgent = _userAgent }, Duration.DeviceKeepAliveTimeout); + + + await _redisPubService.SendDeviceOnlineStatus(CurrentDevice.Id); return; } + // We cannot rely on the json set anymore, since that also happens with uptime and latency + // as we dont want to send a device online status every time, we will do it here online.Uptime = uptime; online.Latency = latency; + + var sendOnlineStatusUpdate = false; if (online.FirmwareVersion != _firmwareVersion || online.Gateway != _lcgConfig.Lcg.Fqdn || @@ -177,9 +187,16 @@ await deviceOnline.InsertAsync(new DeviceOnline online.ConnectedAt = _connected; online.UserAgent = _userAgent; Logger.LogInformation("Updated details of online device"); + + sendOnlineStatusUpdate = true; } await deviceOnline.UpdateAsync(online, Duration.DeviceKeepAliveTimeout); + + if (sendOnlineStatusUpdate) + { + await _redisPubService.SendDeviceOnlineStatus(CurrentDevice.Id); + } } /// diff --git a/LiveControlGateway/Controllers/DeviceV1Controller.cs b/LiveControlGateway/Controllers/DeviceV1Controller.cs index 0646c853..dd312428 100644 --- a/LiveControlGateway/Controllers/DeviceV1Controller.cs +++ b/LiveControlGateway/Controllers/DeviceV1Controller.cs @@ -8,6 +8,7 @@ using OpenShock.Common.Models; using OpenShock.Common.OpenShockDb; using OpenShock.Common.Services.Ota; +using OpenShock.Common.Services.RedisPubSub; using OpenShock.Common.Utils; using OpenShock.Serialization.Deprecated.DoNotUse.V1; using OpenShock.Serialization.Types; @@ -38,15 +39,16 @@ public sealed class DeviceV1Controller : DeviceControllerBase /// /// + /// public DeviceV1Controller( ILogger logger, IHostApplicationLifetime lifetime, IRedisConnectionProvider redisConnectionProvider, IDbContextFactory dbContextFactory, IHubContext userHubContext, - IServiceProvider serviceProvider, LCGConfig lcgConfig) + IServiceProvider serviceProvider, LCGConfig lcgConfig, IRedisPubService redisPubService) : base(logger, lifetime, HubToGatewayMessage.Serializer, GatewayToHubMessage.Serializer, redisConnectionProvider, - dbContextFactory, serviceProvider, lcgConfig) + dbContextFactory, serviceProvider, lcgConfig, redisPubService) { _userHubContext = userHubContext; } diff --git a/LiveControlGateway/Controllers/DeviceV2Controller.cs b/LiveControlGateway/Controllers/DeviceV2Controller.cs index cd4a7a8b..3be4a1af 100644 --- a/LiveControlGateway/Controllers/DeviceV2Controller.cs +++ b/LiveControlGateway/Controllers/DeviceV2Controller.cs @@ -10,6 +10,7 @@ using OpenShock.Common.Models; using OpenShock.Common.OpenShockDb; using OpenShock.Common.Services.Ota; +using OpenShock.Common.Services.RedisPubSub; using OpenShock.Common.Utils; using OpenShock.Serialization.Gateway; using OpenShock.Serialization.Types; @@ -42,15 +43,16 @@ public sealed class DeviceV2Controller : DeviceControllerBase /// /// + /// public DeviceV2Controller( ILogger logger, IHostApplicationLifetime lifetime, IRedisConnectionProvider redisConnectionProvider, IDbContextFactory dbContextFactory, IHubContext userHubContext, - IServiceProvider serviceProvider, LCGConfig lcgConfig) + IServiceProvider serviceProvider, LCGConfig lcgConfig, IRedisPubService redisPubService) : base(logger, lifetime, HubToGatewayMessage.Serializer, GatewayToHubMessage.Serializer, redisConnectionProvider, - dbContextFactory, serviceProvider, lcgConfig) + dbContextFactory, serviceProvider, lcgConfig, redisPubService) { _userHubContext = userHubContext; _pingTimer = new Timer(PingTimerElapsed, null, Duration.DevicePingInitialDelay, Duration.DevicePingPeriod);