Skip to content

Commit 26d1b38

Browse files
committed
Switch back to an implementation inspired by the System/Net/Quic/Internal/ValueTaskSource.cs
1 parent addb51f commit 26d1b38

File tree

2 files changed

+115
-19
lines changed

2 files changed

+115
-19
lines changed

projects/RabbitMQ.Client/Impl/AsyncManualResetEvent.cs

Lines changed: 113 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,44 +29,140 @@
2929
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
33+
using System.Runtime.CompilerServices;
3234
using System.Threading;
3335
using System.Threading.Tasks;
36+
using System.Threading.Tasks.Sources;
3437

3538
namespace RabbitMQ.Client.Impl
3639
{
37-
sealed class AsyncManualResetEvent
40+
sealed class AsyncManualResetEvent(bool initialState = false) : IValueTaskSource
3841
{
39-
volatile TaskCompletionSource<bool> _taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
42+
private readonly object _lock = new();
4043

41-
public AsyncManualResetEvent(bool initialState = false)
44+
private State _state = initialState ? State.Set : State.Reset;
45+
46+
// Do not make this field readonly
47+
private ManualResetValueTaskSourceCore<bool> _valueTaskSource = new() { RunContinuationsAsynchronously = true };
48+
49+
public bool IsSet => (State)Volatile.Read(ref Unsafe.As<State, byte>(ref _state)) == State.Set;
50+
51+
public async ValueTask WaitAsync(CancellationToken cancellationToken = default)
4252
{
43-
if (initialState)
53+
ValueTask valueTask;
54+
CancellationTokenRegistration tokenRegistration = default;
55+
56+
lock (_lock)
57+
{
58+
// If already set, return immediately
59+
if (_state == State.Set)
60+
{
61+
return;
62+
}
63+
64+
// Create the ValueTask with current version
65+
valueTask = new ValueTask(this, _valueTaskSource.Version);
66+
67+
// Only transition to Awaiting if we're in Reset state
68+
if (_state == State.Reset)
69+
{
70+
_state = State.Awaiting;
71+
72+
// Register cancellation if token can be cancelled
73+
if (cancellationToken.CanBeCanceled)
74+
{
75+
#if NET
76+
tokenRegistration = cancellationToken.UnsafeRegister(
77+
static state =>
78+
{
79+
(AsyncManualResetEvent amre, CancellationToken token) =
80+
(Tuple<AsyncManualResetEvent, CancellationToken>)state!;
81+
amre.SetCancelled(token);
82+
}, state: Tuple.Create(this, cancellationToken));
83+
#else
84+
tokenRegistration = cancellationToken.Register(
85+
static state =>
86+
{
87+
(AsyncManualResetEvent amre, CancellationToken token) =
88+
(Tuple<AsyncManualResetEvent, CancellationToken>)state!;
89+
amre.SetCancelled(token);
90+
},
91+
state: Tuple.Create(this, cancellationToken), useSynchronizationContext: false);
92+
#endif
93+
}
94+
}
95+
}
96+
97+
try
4498
{
45-
_taskCompletionSource.SetResult(true);
99+
await valueTask.ConfigureAwait(false);
100+
}
101+
finally
102+
{
103+
// Dispose cancellation registration outside of lock to avoid deadlock
104+
if (tokenRegistration != default)
105+
{
106+
#if NET
107+
await tokenRegistration.DisposeAsync().ConfigureAwait(false);
108+
#else
109+
tokenRegistration.Dispose();
110+
#endif
111+
}
46112
}
47113
}
48114

49-
public bool IsSet => _taskCompletionSource.Task.IsCompleted;
50-
51-
public Task WaitAsync(CancellationToken cancellationToken = default)
115+
public void Set()
52116
{
53-
Task<bool> task = _taskCompletionSource.Task;
54-
return task.IsCompleted ? task : task.WaitAsync(cancellationToken);
55-
}
117+
lock (_lock)
118+
{
119+
State previousState = _state;
120+
_state = State.Set;
56121

57-
public void Set() => _taskCompletionSource.TrySetResult(true);
122+
// Only set result if we were in Awaiting state
123+
if (previousState == State.Awaiting)
124+
{
125+
_valueTaskSource.SetResult(true);
126+
}
127+
}
128+
}
58129

59130
public void Reset()
60131
{
61-
while (true)
132+
lock (_lock)
62133
{
63-
TaskCompletionSource<bool> currentTcs = _taskCompletionSource;
64-
if (!currentTcs.Task.IsCompleted ||
65-
Interlocked.CompareExchange(ref _taskCompletionSource, new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), currentTcs) == currentTcs)
134+
if (_state == State.Set)
66135
{
67-
return;
136+
_state = State.Reset;
137+
_valueTaskSource.Reset();
138+
}
139+
}
140+
}
141+
142+
void SetCancelled(CancellationToken cancellationToken)
143+
{
144+
lock (_lock)
145+
{
146+
if (_state == State.Awaiting)
147+
{
148+
_state = State.Reset; // Reset to allow future waits
149+
_valueTaskSource.SetException(new OperationCanceledException(cancellationToken));
68150
}
69151
}
70152
}
153+
154+
void IValueTaskSource.GetResult(short token) => _valueTaskSource.GetResult(token);
155+
156+
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) => _valueTaskSource.GetStatus(token);
157+
158+
void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token,
159+
ValueTaskSourceOnCompletedFlags flags) => _valueTaskSource.OnCompleted(continuation, state, token, flags);
160+
161+
enum State : byte
162+
{
163+
Reset,
164+
Set,
165+
Awaiting
166+
}
71167
}
72168
}

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,11 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
229229
}
230230

231231
[MethodImpl(MethodImplOptions.AggressiveInlining)]
232-
private Task MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
232+
private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
233233
{
234234
if (_flowControlBlock.IsSet)
235235
{
236-
return Task.CompletedTask;
236+
return default;
237237
}
238238

239239
return _flowControlBlock.WaitAsync(cancellationToken);

0 commit comments

Comments
 (0)