Skip to content

Commit

Permalink
Send own pubsub message for online status updates instead of using js…
Browse files Browse the repository at this point in the history
…on set
  • Loading branch information
LucHeart committed Nov 6, 2024
1 parent fdd5df8 commit e6d3760
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 7 deletions.
2 changes: 1 addition & 1 deletion API/Realtime/RedisSubscriberService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public RedisSubscriberService(
public async Task StartAsync(CancellationToken cancellationToken)
{
await _subscriber.SubscribeAsync(RedisChannels.KeyEventExpired, (_, message) => { LucTask.Run(() => RunLogic(message, false)); });
await _subscriber.SubscribeAsync(RedisChannels.KeyEventJsonSet, (_, message) => { LucTask.Run(() => RunLogic(message, true)); });
await _subscriber.SubscribeAsync(RedisChannels.DeviceOnlineStatus, (_, message) => { LucTask.Run(() => RunLogic(message, true)); });
await _subscriber.SubscribeAsync(RedisChannels.KeyEventDel, (_, message) => { LucTask.Run(() => RunLogic(message, false)); });
}

Expand Down
7 changes: 7 additions & 0 deletions Common/Services/RedisPubSub/IRedisPubService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ namespace OpenShock.Common.Services.RedisPubSub;

public interface IRedisPubService
{
/// <summary>
/// Used when a device comes online or changes its connection details like, gateway, firmware version, etc.
/// </summary>
/// <param name="deviceId"></param>
/// <returns></returns>
public Task SendDeviceOnlineStatus(Guid deviceId);

/// <summary>
/// General shocker control
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions Common/Services/RedisPubSub/RedisChannels.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public static class RedisChannels
public static readonly RedisChannel DeviceControl = new("msg-device-control", RedisChannel.PatternMode.Literal);
public static readonly RedisChannel DeviceCaptive = new("msg-device-control-captive", RedisChannel.PatternMode.Literal);
public static readonly RedisChannel DeviceUpdate = new("msg-device-update", RedisChannel.PatternMode.Literal);
public static readonly RedisChannel DeviceOnlineStatus = new("msg-device-online-status", RedisChannel.PatternMode.Literal);

// OTA
public static readonly RedisChannel DeviceOtaInstall = new("msg-device-ota-install", RedisChannel.PatternMode.Literal);
Expand Down
10 changes: 10 additions & 0 deletions Common/Services/RedisPubSub/RedisPubService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ public RedisPubService(IConnectionMultiplexer connectionMultiplexer)
_subscriber = connectionMultiplexer.GetSubscriber();
}

public Task SendDeviceOnlineStatus(Guid deviceId)
{
var redisMessage = new DeviceUpdatedMessage
{
Id = deviceId
};

return _subscriber.PublishAsync(RedisChannels.DeviceOnlineStatus, JsonSerializer.Serialize(redisMessage));
}

/// <inheritdoc />
public Task SendDeviceControl(Guid sender, IDictionary<Guid, IList<ControlMessage.ShockerControlInfo>> controlMessages)
{
Expand Down
21 changes: 19 additions & 2 deletions LiveControlGateway/Controllers/DeviceControllerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using OpenShock.Common.OpenShockDb;
using OpenShock.Common.Problems;
using OpenShock.Common.Redis;
using OpenShock.Common.Services.RedisPubSub;
using OpenShock.Common.Utils;
using OpenShock.LiveControlGateway.LifetimeManager;
using OpenShock.LiveControlGateway.Websocket;
Expand Down Expand Up @@ -44,6 +45,7 @@ public abstract class DeviceControllerBase<TIn, TOut> : FlatbuffersWebsocketBase
private readonly IRedisConnectionProvider _redisConnectionProvider;
private readonly IDbContextFactory<OpenShockContext> _dbContextFactory;
private readonly LCGConfig _lcgConfig;
private readonly IRedisPubService _redisPubService;

private readonly Timer _keepAliveTimeoutTimer = new(Duration.DeviceKeepAliveInitialTimeout);
private DateTimeOffset _connected = DateTimeOffset.UtcNow;
Expand Down Expand Up @@ -79,14 +81,15 @@ protected DeviceControllerBase(
ISerializer<TOut> outgoingSerializer,
IRedisConnectionProvider redisConnectionProvider,
IDbContextFactory<OpenShockContext> dbContextFactory,
IServiceProvider serviceProvider, LCGConfig lcgConfig

IServiceProvider serviceProvider, LCGConfig lcgConfig,
IRedisPubService redisPubService
) : base(logger, lifetime, incomingSerializer, outgoingSerializer)
{
_redisConnectionProvider = redisConnectionProvider;
_dbContextFactory = dbContextFactory;
ServiceProvider = serviceProvider;
_lcgConfig = lcgConfig;
_redisPubService = redisPubService;
_keepAliveTimeoutTimer.Elapsed += async (sender, args) =>
{
Logger.LogInformation("Keep alive timeout reached, closing websocket connection");
Expand Down Expand Up @@ -161,11 +164,18 @@ await deviceOnline.InsertAsync(new DeviceOnline
ConnectedAt = _connected,
UserAgent = _userAgent
}, Duration.DeviceKeepAliveTimeout);


await _redisPubService.SendDeviceOnlineStatus(CurrentDevice.Id);
return;
}

// We cannot rely on the json set anymore, since that also happens with uptime and latency
// as we dont want to send a device online status every time, we will do it here
online.Uptime = uptime;
online.Latency = latency;

var sendOnlineStatusUpdate = false;

if (online.FirmwareVersion != _firmwareVersion ||
online.Gateway != _lcgConfig.Lcg.Fqdn ||
Expand All @@ -177,9 +187,16 @@ await deviceOnline.InsertAsync(new DeviceOnline
online.ConnectedAt = _connected;
online.UserAgent = _userAgent;
Logger.LogInformation("Updated details of online device");

sendOnlineStatusUpdate = true;
}

await deviceOnline.UpdateAsync(online, Duration.DeviceKeepAliveTimeout);

if (sendOnlineStatusUpdate)
{
await _redisPubService.SendDeviceOnlineStatus(CurrentDevice.Id);
}
}

/// <inheritdoc />
Expand Down
6 changes: 4 additions & 2 deletions LiveControlGateway/Controllers/DeviceV1Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using OpenShock.Common.Models;
using OpenShock.Common.OpenShockDb;
using OpenShock.Common.Services.Ota;
using OpenShock.Common.Services.RedisPubSub;
using OpenShock.Common.Utils;
using OpenShock.Serialization.Deprecated.DoNotUse.V1;
using OpenShock.Serialization.Types;
Expand Down Expand Up @@ -38,15 +39,16 @@ public sealed class DeviceV1Controller : DeviceControllerBase<HubToGatewayMessag
/// <param name="userHubContext"></param>
/// <param name="serviceProvider"></param>
/// <param name="lcgConfig"></param>
/// <param name="redisPubService"></param>
public DeviceV1Controller(
ILogger<DeviceV1Controller> logger,
IHostApplicationLifetime lifetime,
IRedisConnectionProvider redisConnectionProvider,
IDbContextFactory<OpenShockContext> dbContextFactory,
IHubContext<UserHub, IUserHub> userHubContext,
IServiceProvider serviceProvider, LCGConfig lcgConfig)
IServiceProvider serviceProvider, LCGConfig lcgConfig, IRedisPubService redisPubService)
: base(logger, lifetime, HubToGatewayMessage.Serializer, GatewayToHubMessage.Serializer, redisConnectionProvider,
dbContextFactory, serviceProvider, lcgConfig)
dbContextFactory, serviceProvider, lcgConfig, redisPubService)
{
_userHubContext = userHubContext;
}
Expand Down
6 changes: 4 additions & 2 deletions LiveControlGateway/Controllers/DeviceV2Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using OpenShock.Common.Models;
using OpenShock.Common.OpenShockDb;
using OpenShock.Common.Services.Ota;
using OpenShock.Common.Services.RedisPubSub;
using OpenShock.Common.Utils;
using OpenShock.Serialization.Gateway;
using OpenShock.Serialization.Types;
Expand Down Expand Up @@ -42,15 +43,16 @@ public sealed class DeviceV2Controller : DeviceControllerBase<HubToGatewayMessag
/// <param name="userHubContext"></param>
/// <param name="serviceProvider"></param>
/// <param name="lcgConfig"></param>
/// <param name="redisPubService"></param>
public DeviceV2Controller(
ILogger<DeviceV2Controller> logger,
IHostApplicationLifetime lifetime,
IRedisConnectionProvider redisConnectionProvider,
IDbContextFactory<OpenShockContext> dbContextFactory,
IHubContext<UserHub, IUserHub> userHubContext,
IServiceProvider serviceProvider, LCGConfig lcgConfig)
IServiceProvider serviceProvider, LCGConfig lcgConfig, IRedisPubService redisPubService)
: base(logger, lifetime, HubToGatewayMessage.Serializer, GatewayToHubMessage.Serializer, redisConnectionProvider,
dbContextFactory, serviceProvider, lcgConfig)
dbContextFactory, serviceProvider, lcgConfig, redisPubService)
{
_userHubContext = userHubContext;
_pingTimer = new Timer(PingTimerElapsed, null, Duration.DevicePingInitialDelay, Duration.DevicePingPeriod);
Expand Down

0 comments on commit e6d3760

Please sign in to comment.