Skip to content

Commit 42a4ccf

Browse files
authored
fix bug in pause/resume. (#2233)
1 parent 758b6fd commit 42a4ccf

File tree

3 files changed

+38
-27
lines changed

3 files changed

+38
-27
lines changed

src/Microsoft.Azure.SignalR.Common/Utilities/PauseHandler.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
// Copyright (c) Microsoft. All rights reserved.
1+
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

44
using System.Threading;
55
using System.Threading.Tasks;
66

77
namespace Microsoft.Azure.SignalR;
88

9+
#nullable enable
10+
911
internal class PauseHandler
1012
{
1113
private readonly SemaphoreSlim _pauseSemaphore = new(1, 1);
@@ -14,9 +16,9 @@ internal class PauseHandler
1416

1517
private volatile int _pauseAcked = 1;
1618

17-
public bool ShouldReplyAck => Interlocked.CompareExchange(ref _pauseAcked, 1, 0) == 0;
19+
public bool ShouldReplyAck() => Interlocked.CompareExchange(ref _pauseAcked, 1, 0) == 0;
1820

19-
public async Task<bool> WaitAsync(int ms, CancellationToken ctoken) => _pauseSemaphore.Wait(0, ctoken) || await _pauseSemaphore.WaitAsync(ms, ctoken);
21+
public Task<bool> TryAcquire(int millisecondsTimeout) => _pauseSemaphore.WaitAsync(millisecondsTimeout);
2022

2123
public void Release() => _pauseSemaphore.Release();
2224

src/Microsoft.Azure.SignalR/ClientConnections/ClientConnectionContext.cs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) Microsoft. All rights reserved.
1+
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

44
using System;
@@ -261,7 +261,7 @@ public Task PauseAsync()
261261

262262
public Task PauseAckAsync()
263263
{
264-
if (_pauseHandler.ShouldReplyAck)
264+
if (_pauseHandler.ShouldReplyAck())
265265
{
266266
Log.OutgoingTaskPauseAck(Logger, ConnectionId);
267267
var message = new ConnectionFlowControlMessage(ConnectionId, ConnectionFlowControlOperation.PauseAck);
@@ -334,18 +334,26 @@ internal async Task ProcessOutgoingMessagesAsync(SignalRProtocol.IHubProtocol pr
334334
if (HandshakeResponseTask.IsCompleted)
335335
{
336336
var next = buffer;
337-
while (!buffer.IsEmpty && protocol.TryParseMessage(ref next, FakeInvocationBinder.Instance, out var message))
337+
338+
// we still want messages to successfully going out when application completes
339+
int waitTime = 0;
340+
while (!await _pauseHandler.TryAcquire(1000))
338341
{
339-
// we still want messages to successfully going out when application completes
340-
if (!await _pauseHandler.WaitAsync(StaticRandom.Next(500, 1500), OutgoingAborted))
342+
Log.OutgoingTaskPaused(Logger, ConnectionId);
343+
if (OutgoingAborted.IsCancellationRequested)
341344
{
342-
Log.OutgoingTaskPaused(Logger, ConnectionId);
343-
buffer = buffer.Slice(0);
344-
break;
345+
waitTime++;
346+
if (waitTime > 5)
347+
{
348+
OutgoingAborted.ThrowIfCancellationRequested();
349+
}
345350
}
346-
347-
try
351+
}
352+
try
353+
{
354+
while (!buffer.IsEmpty && protocol.TryParseMessage(ref next, FakeInvocationBinder.Instance, out var message))
348355
{
356+
349357
var messageType = message switch
350358
{
351359
SignalRProtocol.HubInvocationMessage => DataMessageType.Invocation,
@@ -363,10 +371,10 @@ internal async Task ProcessOutgoingMessagesAsync(SignalRProtocol.IHubProtocol pr
363371
_ => next,
364372
};
365373
}
366-
finally
367-
{
368-
_pauseHandler.Release();
369-
}
374+
}
375+
finally
376+
{
377+
_pauseHandler.Release();
370378
}
371379
}
372380
}
Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) Microsoft. All rights reserved.
1+
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

44
using System.Threading.Tasks;
@@ -13,22 +13,23 @@ public class PauseHandlerTests
1313
public async Task TestPauseAndResume()
1414
{
1515
var handler = new PauseHandler();
16-
Assert.False(handler.ShouldReplyAck);
16+
Assert.False(handler.ShouldReplyAck());
1717

1818
await handler.PauseAsync();
19-
Assert.False(await handler.WaitAsync(100, default));
19+
Assert.False(await handler.TryAcquire(100));
2020

21-
Assert.True(handler.ShouldReplyAck);
22-
Assert.False(handler.ShouldReplyAck); // ack only once
21+
Assert.True(handler.ShouldReplyAck());
22+
Assert.False(handler.ShouldReplyAck()); // ack only once
2323

2424
await handler.PauseAsync(); // pause can be called multiple times.
25-
Assert.False(await handler.WaitAsync(100, default));
26-
Assert.False(handler.ShouldReplyAck); // already acked previously
25+
Assert.False(await handler.TryAcquire(100));
26+
Assert.False(handler.ShouldReplyAck()); // already acked previously
2727

2828
await handler.ResumeAsync();
29-
Assert.True(await handler.WaitAsync(100, default));
30-
Assert.False(await handler.WaitAsync(100, default)); // only 1 parallel
29+
30+
Assert.True(await handler.TryAcquire(100));
31+
Assert.False(await handler.TryAcquire(100)); // only 1 parallel
3132
handler.Release();
32-
Assert.True(await handler.WaitAsync(100, default));
33+
Assert.True(await handler.TryAcquire(100));
3334
}
3435
}

0 commit comments

Comments
 (0)