Skip to content

Commit 70301c2

Browse files
committed
new logic for KeepAlive
1 parent 6061afd commit 70301c2

File tree

11 files changed

+315
-114
lines changed

11 files changed

+315
-114
lines changed

Directory.Build.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
<RepositoryUrl>https://github.com/managedcode/Orleans.SignalR</RepositoryUrl>
2626
<PackageProjectUrl>https://github.com/managedcode/Orleans.SignalR</PackageProjectUrl>
2727
<Product>Managed Code - Orleans SignalR</Product>
28-
<Version>10.0.1</Version>
29-
<PackageVersion>10.0.1</PackageVersion>
28+
<Version>10.0.2</Version>
29+
<PackageVersion>10.0.2</PackageVersion>
3030

3131
</PropertyGroup>
3232
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">

ManagedCode.Orleans.SignalR.Core/Models/ConnectionHeartbeatRegistration.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
2+
using System.Collections.Immutable;
23
using ManagedCode.Orleans.SignalR.Core.Interfaces;
34
using Orleans;
5+
using Orleans.Runtime;
46

57
namespace ManagedCode.Orleans.SignalR.Core.Models;
68

@@ -10,4 +12,5 @@ public sealed record ConnectionHeartbeatRegistration(
1012
[property: Id(1)] bool UsePartitioning,
1113
[property: Id(2)] int PartitionId,
1214
[property: Id(3)] ISignalRObserver Observer,
13-
[property: Id(4)] TimeSpan Interval);
15+
[property: Id(4)] TimeSpan Interval,
16+
[property: Id(5)] ImmutableArray<GrainReference> GrainReferences);

ManagedCode.Orleans.SignalR.Core/SignalR/Observers/Subscription.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Collections.Immutable;
34
using ManagedCode.Orleans.SignalR.Core.Interfaces;
5+
using Orleans.Runtime;
46

57
namespace ManagedCode.Orleans.SignalR.Core.SignalR.Observers;
68

@@ -16,6 +18,12 @@ public class Subscription(SignalRObserver observer) : IDisposable
1618

1719
public ISignalRObserver Reference { get; private set; } = default!;
1820

21+
public string? HubKey { get; private set; }
22+
23+
public bool UsePartitioning { get; private set; }
24+
25+
public int PartitionId { get; private set; }
26+
1927
public IReadOnlyCollection<IObserverConnectionManager> Grains => _grains;
2028

2129
public void Dispose()
@@ -29,6 +37,9 @@ public void Dispose()
2937
observer?.Dispose();
3038
_grains?.Clear();
3139
Reference = null!;
40+
HubKey = null;
41+
UsePartitioning = false;
42+
PartitionId = 0;
3243
}
3344

3445
public void AddGrain(IObserverConnectionManager grain)
@@ -46,8 +57,31 @@ public void SetReference(ISignalRObserver reference)
4657
Reference = reference;
4758
}
4859

60+
public void SetConnectionMetadata(string hubKey, bool usePartitioning, int partitionId)
61+
{
62+
HubKey = hubKey;
63+
UsePartitioning = usePartitioning;
64+
PartitionId = partitionId;
65+
}
66+
4967
public SignalRObserver GetObserver()
5068
{
5169
return observer;
5270
}
71+
72+
public ImmutableArray<GrainReference> GetGrainSnapshot()
73+
{
74+
if (_grains.Count == 0)
75+
{
76+
return ImmutableArray<GrainReference>.Empty;
77+
}
78+
79+
var builder = ImmutableArray.CreateBuilder<GrainReference>(_grains.Count);
80+
foreach (var grain in _grains)
81+
{
82+
builder.Add((GrainReference)grain);
83+
}
84+
85+
return builder.MoveToImmutable();
86+
}
5387
}

ManagedCode.Orleans.SignalR.Core/SignalR/OrleansHubLifetimeManager.cs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,7 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)
6666
await connectionHolderGrain.AddConnection(connection.ConnectionId, subscription.Reference);
6767
}
6868

69-
if (_orleansSignalOptions.Value.KeepEachConnectionAlive)
70-
{
71-
var heartbeatInterval = TimeIntervalHelper.GetClientTimeoutInterval(_orleansSignalOptions, _globalHubOptions, _hubOptions);
72-
var heartbeatGrain = NameHelperGenerator.GetConnectionHeartbeatGrain(_clusterClient, hubKey, connection.ConnectionId);
73-
var registration = new ConnectionHeartbeatRegistration(
74-
hubKey,
75-
usePartitions,
76-
partitionId,
77-
subscription.Reference,
78-
heartbeatInterval);
79-
await heartbeatGrain.Start(registration);
80-
}
69+
subscription.SetConnectionMetadata(hubKey, usePartitions, partitionId);
8170

8271
if (!string.IsNullOrEmpty(connection.UserIdentifier))
8372
{
@@ -86,6 +75,8 @@ public override async Task OnConnectedAsync(HubConnectionContext connection)
8675
await userGrain.AddConnection(connection.ConnectionId, subscription.Reference);
8776
_ = Task.Run(userGrain.RequestMessage);
8877
}
78+
79+
await UpdateConnectionHeartbeatAsync(connection.ConnectionId, subscription);
8980
}
9081

9182
public override async Task OnDisconnectedAsync(HubConnectionContext connection)
@@ -304,6 +295,8 @@ public override async Task AddToGroupAsync(string connectionId, string groupName
304295
await Task.Run(() => groupGrain.AddConnection(connectionId, subscription.Reference), cancellationToken);
305296
subscription.AddGrain(groupGrain);
306297
}
298+
299+
await UpdateConnectionHeartbeatAsync(connectionId, subscription);
307300
}
308301

309302
public override async Task RemoveFromGroupAsync(string connectionId, string groupName,
@@ -330,13 +323,15 @@ public override async Task RemoveFromGroupAsync(string connectionId, string grou
330323
if (!stillTracked)
331324
{
332325
subscription.RemoveGrain(partitionGrain);
326+
await UpdateConnectionHeartbeatAsync(connectionId, subscription);
333327
}
334328
}
335329
else
336330
{
337331
var groupGrain = NameHelperGenerator.GetSignalRGroupGrain<THub>(_clusterClient, groupName);
338332
await Task.Run(() => groupGrain.RemoveConnection(connectionId, subscription.Reference), cancellationToken);
339333
subscription.RemoveGrain(groupGrain);
334+
await UpdateConnectionHeartbeatAsync(connectionId, subscription);
340335
}
341336
}
342337

@@ -563,6 +558,27 @@ private static string GenerateInvocationId()
563558
return connection?.Features.Get<Subscription>();
564559
}
565560

561+
private Task UpdateConnectionHeartbeatAsync(string connectionId, Subscription subscription)
562+
{
563+
if (!_orleansSignalOptions.Value.KeepEachConnectionAlive || string.IsNullOrEmpty(subscription.HubKey))
564+
{
565+
return Task.CompletedTask;
566+
}
567+
568+
var hubKey = subscription.HubKey!;
569+
var heartbeatInterval = TimeIntervalHelper.GetClientTimeoutInterval(_orleansSignalOptions, _globalHubOptions, _hubOptions);
570+
var heartbeatGrain = NameHelperGenerator.GetConnectionHeartbeatGrain(_clusterClient, hubKey, connectionId);
571+
var registration = new ConnectionHeartbeatRegistration(
572+
hubKey,
573+
subscription.UsePartitioning,
574+
subscription.PartitionId,
575+
subscription.Reference,
576+
heartbeatInterval,
577+
subscription.GetGrainSnapshot());
578+
579+
return heartbeatGrain.Start(registration);
580+
}
581+
566582
private static bool IsLocalObjectReferenceException(Exception ex)
567583
{
568584
return ex is InvalidOperationException invalid &&

ManagedCode.Orleans.SignalR.Server/SignalRConnectionCoordinatorGrain.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public class SignalRConnectionCoordinatorGrain(
2424
IClusterClient _) : Grain, ISignalRConnectionCoordinatorGrain
2525
{
2626
private readonly Dictionary<string, int> _connectionPartitions = new(StringComparer.Ordinal);
27-
private readonly ILogger<SignalRConnectionCoordinatorGrain> _logger = logger;
2827
private readonly int _connectionsPerPartitionHint = Math.Max(1, options.Value.ConnectionsPerPartitionHint);
2928
private uint _basePartitionCount;
3029
private int _currentPartitionCount;
@@ -54,7 +53,7 @@ public Task<int> GetPartitionForConnection(string connectionId)
5453

5554
if (stopwatch.Elapsed > TimeSpan.FromMilliseconds(500))
5655
{
57-
_logger.LogWarning(
56+
logger.LogWarning(
5857
"GetPartitionForConnection for {ConnectionId} took {Elapsed} (tracked={Tracked})",
5958
connectionId,
6059
stopwatch.Elapsed,
@@ -76,7 +75,7 @@ public async Task SendToAll(HubMessage message)
7675
.GroupBy(static kvp => kvp.Value)
7776
.Select(group => $"{group.Key}:{group.Count()}")
7877
.ToArray();
79-
_logger.LogInformation("Sending to all partitions {Distribution}", string.Join(",", distribution));
78+
logger.LogInformation("Sending to all partitions {Distribution}", string.Join(",", distribution));
8079

8180
var tasks = new List<Task>(partitions.Count);
8281
foreach (var partitionId in partitions)
@@ -161,10 +160,10 @@ public Task NotifyConnectionRemoved(string connectionId)
161160
{
162161
if (_connectionPartitions.Remove(connectionId))
163162
{
164-
_logger.LogDebug("Removed connection {ConnectionId} from coordinator mapping.", connectionId);
163+
logger.LogDebug("Removed connection {ConnectionId} from coordinator mapping.", connectionId);
165164
if (_connectionPartitions.Count == 0 && _currentPartitionCount != _basePartitionCount)
166165
{
167-
_logger.LogDebug("Resetting partition count to base value {PartitionCount} as no active connections remain.", _basePartitionCount);
166+
logger.LogDebug("Resetting partition count to base value {PartitionCount} as no active connections remain.", _basePartitionCount);
168167
_currentPartitionCount = (int)_basePartitionCount;
169168
}
170169
}
@@ -196,7 +195,7 @@ private int GetOrAssignPartition(string connectionId)
196195
partition = PartitionHelper.GetPartitionId(connectionId, (uint)partitionCount);
197196
_connectionPartitions[connectionId] = partition;
198197

199-
_logger.LogDebug("Assigned connection {ConnectionId} to partition {Partition} (partitionCount={PartitionCount})", connectionId, partition, partitionCount);
198+
logger.LogDebug("Assigned connection {ConnectionId} to partition {Partition} (partitionCount={PartitionCount})", connectionId, partition, partitionCount);
200199
return partition;
201200
}
202201

@@ -207,7 +206,7 @@ private int EnsurePartitionCapacity(int prospectiveConnections)
207206

208207
if (desired > _currentPartitionCount)
209208
{
210-
_logger.LogInformation(
209+
logger.LogInformation(
211210
"Increasing connection partition count from {OldPartitionCount} to {NewPartitionCount} for {ConnectionCount} tracked connections.",
212211
_currentPartitionCount,
213212
desired,

ManagedCode.Orleans.SignalR.Server/SignalRConnectionHeartbeatGrain.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@ namespace ManagedCode.Orleans.SignalR.Server;
1515
public class SignalRConnectionHeartbeatGrain(
1616
ILogger<SignalRConnectionHeartbeatGrain> logger) : Grain, ISignalRConnectionHeartbeatGrain
1717
{
18-
private readonly ILogger<SignalRConnectionHeartbeatGrain> _logger = logger;
1918
private ConnectionHeartbeatRegistration? _registration;
2019
private IDisposable? _timer;
2120

2221
public Task Start(ConnectionHeartbeatRegistration registration)
2322
{
2423
_registration = registration;
2524
ResetTimer(registration.Interval);
26-
_logger.LogDebug("Heartbeat started for connection grain {Key} (partitioned={Partitioned}, partitionId={PartitionId}).",
25+
logger.LogDebug("Heartbeat started for connection grain {Key} (partitioned={Partitioned}, partitionId={PartitionId}).",
2726
this.GetPrimaryKeyString(), registration.UsePartitioning, registration.PartitionId);
2827
return Task.CompletedTask;
2928
}
@@ -32,7 +31,7 @@ public Task Stop()
3231
{
3332
ResetTimer(null);
3433
_registration = null;
35-
_logger.LogDebug("Heartbeat stopped for connection grain {Key}.", this.GetPrimaryKeyString());
34+
logger.LogDebug("Heartbeat stopped for connection grain {Key}.", this.GetPrimaryKeyString());
3635
return Task.CompletedTask;
3736
}
3837

@@ -62,22 +61,23 @@ private async Task OnTimerTickAsync(object? _)
6261
return;
6362
}
6463

64+
var grains = _registration.GrainReferences;
65+
if (grains.IsDefaultOrEmpty)
66+
{
67+
return;
68+
}
69+
6570
try
6671
{
67-
if (_registration.UsePartitioning)
68-
{
69-
var partition = NameHelperGenerator.GetConnectionPartitionGrain(GrainFactory, _registration.HubKey, _registration.PartitionId);
70-
await partition.Ping(_registration.Observer);
71-
}
72-
else
72+
foreach (var grainReference in grains)
7373
{
74-
var holder = NameHelperGenerator.GetConnectionHolderGrain(GrainFactory, _registration.HubKey);
75-
await holder.Ping(_registration.Observer);
74+
var manager = grainReference.Cast<IObserverConnectionManager>();
75+
await manager.Ping(_registration.Observer);
7676
}
7777
}
7878
catch (Exception ex)
7979
{
80-
_logger.LogDebug(ex, "Heartbeat ping failed for connection grain {Key}.", this.GetPrimaryKeyString());
80+
logger.LogDebug(ex, "Heartbeat ping failed for connection grain {Key}.", this.GetPrimaryKeyString());
8181
}
8282
}
8383
}

ManagedCode.Orleans.SignalR.Server/SignalRConnectionHolderGrain.cs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,35 +19,28 @@ namespace ManagedCode.Orleans.SignalR.Server;
1919

2020
[Reentrant]
2121
[GrainType($"ManagedCode.{nameof(SignalRConnectionHolderGrain)}")]
22-
public class SignalRConnectionHolderGrain : SignalRObserverGrainBase<SignalRConnectionHolderGrain>, ISignalRConnectionHolderGrain
22+
public class SignalRConnectionHolderGrain(
23+
ILogger<SignalRConnectionHolderGrain> logger,
24+
IOptions<OrleansSignalROptions> orleansSignalOptions,
25+
IOptions<HubOptions> hubOptions,
26+
[PersistentState(nameof(SignalRConnectionHolderGrain), OrleansSignalROptions.OrleansSignalRStorage)]
27+
IPersistentState<ConnectionState> stateStorage)
28+
: SignalRObserverGrainBase<SignalRConnectionHolderGrain>(logger, orleansSignalOptions, hubOptions), ISignalRConnectionHolderGrain
2329
{
24-
private readonly IPersistentState<ConnectionState> _stateStorage;
25-
26-
public SignalRConnectionHolderGrain(
27-
ILogger<SignalRConnectionHolderGrain> logger,
28-
IOptions<OrleansSignalROptions> orleansSignalOptions,
29-
IOptions<HubOptions> hubOptions,
30-
[PersistentState(nameof(SignalRConnectionHolderGrain), OrleansSignalROptions.OrleansSignalRStorage)]
31-
IPersistentState<ConnectionState> stateStorage)
32-
: base(logger, orleansSignalOptions, hubOptions)
33-
{
34-
_stateStorage = stateStorage;
35-
}
36-
37-
protected override int TrackedConnectionCount => _stateStorage.State.ConnectionIds.Count;
30+
protected override int TrackedConnectionCount => stateStorage.State.ConnectionIds.Count;
3831

3932
public Task AddConnection(string connectionId, ISignalRObserver observer)
4033
{
4134
Logs.AddConnection(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
42-
_stateStorage.State.ConnectionIds[connectionId] = observer.GetPrimaryKeyString();
35+
stateStorage.State.ConnectionIds[connectionId] = observer.GetPrimaryKeyString();
4336
TrackConnection(connectionId, observer);
4437
return Task.CompletedTask;
4538
}
4639

4740
public Task RemoveConnection(string connectionId, ISignalRObserver observer)
4841
{
4942
Logs.RemoveConnection(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
50-
_stateStorage.State.ConnectionIds.Remove(connectionId);
43+
stateStorage.State.ConnectionIds.Remove(connectionId);
5144
UntrackConnection(connectionId, observer);
5245
return Task.CompletedTask;
5346
}
@@ -80,7 +73,7 @@ public async Task SendToAllExcept(HubMessage message, string[] excludedConnectio
8073
var hashSet = new HashSet<string>();
8174
foreach (var connectionId in excludedConnectionIds)
8275
{
83-
if (_stateStorage.State.ConnectionIds.TryGetValue(connectionId, out var observer))
76+
if (stateStorage.State.ConnectionIds.TryGetValue(connectionId, out var observer))
8477
{
8578
hashSet.Add(observer);
8679
}
@@ -94,7 +87,7 @@ public async Task<bool> SendToConnection(HubMessage message, string connectionId
9487
{
9588
Logs.SendToConnection(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
9689

97-
if (!_stateStorage.State.ConnectionIds.TryGetValue(connectionId, out var observer))
90+
if (!stateStorage.State.ConnectionIds.TryGetValue(connectionId, out var observer))
9891
{
9992
return false;
10093
}
@@ -137,7 +130,7 @@ public async Task SendToConnections(HubMessage message, string[] connectionIds)
137130
var hashSet = new HashSet<string>();
138131
foreach (var connectionId in connectionIds)
139132
{
140-
if (_stateStorage.State.ConnectionIds.TryGetValue(connectionId, out var observer))
133+
if (stateStorage.State.ConnectionIds.TryGetValue(connectionId, out var observer))
141134
{
142135
hashSet.Add(observer);
143136
}
@@ -159,13 +152,13 @@ public override async Task OnDeactivateAsync(DeactivationReason reason, Cancella
159152
Logs.OnDeactivateAsync(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString());
160153
ClearObserverTracking();
161154

162-
if (ObserverManager.Count == 0 || _stateStorage.State.ConnectionIds.Count == 0)
155+
if (ObserverManager.Count == 0 || stateStorage.State.ConnectionIds.Count == 0)
163156
{
164-
await _stateStorage.ClearStateAsync(cancellationToken);
157+
await stateStorage.ClearStateAsync(cancellationToken);
165158
}
166159
else
167160
{
168-
await _stateStorage.WriteStateAsync(cancellationToken);
161+
await stateStorage.WriteStateAsync(cancellationToken);
169162
}
170163
}
171164

0 commit comments

Comments
 (0)