Skip to content

Commit addb51f

Browse files
committed
Revert to the original version of AsyncManualResetEvent
1 parent 72837c5 commit addb51f

File tree

2 files changed

+17
-100
lines changed

2 files changed

+17
-100
lines changed

projects/RabbitMQ.Client/Impl/AsyncManualResetEvent.cs

Lines changed: 15 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -29,127 +29,44 @@
2929
// Copyright (c) 2007-2025 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32-
using System;
33-
using System.Diagnostics.CodeAnalysis;
3432
using System.Threading;
3533
using System.Threading.Tasks;
36-
using System.Threading.Tasks.Sources;
3734

3835
namespace RabbitMQ.Client.Impl
3936
{
40-
sealed class AsyncManualResetEvent : IValueTaskSource
37+
sealed class AsyncManualResetEvent
4138
{
42-
private ManualResetValueTaskSourceCore<bool> _valueTaskSource;
43-
private bool _isSet;
39+
volatile TaskCompletionSource<bool> _taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
4440

4541
public AsyncManualResetEvent(bool initialState = false)
4642
{
47-
_isSet = initialState;
48-
_valueTaskSource.Reset();
4943
if (initialState)
5044
{
51-
_valueTaskSource.SetResult(true);
45+
_taskCompletionSource.SetResult(true);
5246
}
5347
}
5448

55-
public bool IsSet => Volatile.Read(ref _isSet);
49+
public bool IsSet => _taskCompletionSource.Task.IsCompleted;
5650

57-
public async ValueTask WaitAsync(CancellationToken cancellationToken)
51+
public Task WaitAsync(CancellationToken cancellationToken = default)
5852
{
59-
if (IsSet)
60-
{
61-
return;
62-
}
63-
64-
cancellationToken.ThrowIfCancellationRequested();
65-
66-
CancellationTokenRegistration tokenRegistration =
67-
#if NET
68-
cancellationToken.UnsafeRegister(
69-
static state =>
70-
{
71-
var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!;
72-
source.SetException(new OperationCanceledException(token));
73-
}, (_valueTaskSource, cancellationToken));
74-
#else
75-
cancellationToken.Register(
76-
static state =>
77-
{
78-
var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!;
79-
source.SetException(new OperationCanceledException(token));
80-
},
81-
state: (_valueTaskSource, cancellationToken), useSynchronizationContext: false);
82-
#endif
83-
try
84-
{
85-
await new ValueTask(this, _valueTaskSource.Version)
86-
.ConfigureAwait(false);
87-
}
88-
finally
89-
{
90-
#if NET
91-
await tokenRegistration.DisposeAsync()
92-
.ConfigureAwait(false);
93-
#else
94-
tokenRegistration.Dispose();
95-
#endif
96-
}
53+
Task<bool> task = _taskCompletionSource.Task;
54+
return task.IsCompleted ? task : task.WaitAsync(cancellationToken);
9755
}
9856

99-
public void Set()
100-
{
101-
if (IsSet)
102-
{
103-
return;
104-
}
105-
106-
Volatile.Write(ref _isSet, true);
107-
_valueTaskSource.SetResult(true);
108-
}
57+
public void Set() => _taskCompletionSource.TrySetResult(true);
10958

11059
public void Reset()
11160
{
112-
if (!IsSet)
61+
while (true)
11362
{
114-
return;
63+
TaskCompletionSource<bool> currentTcs = _taskCompletionSource;
64+
if (!currentTcs.Task.IsCompleted ||
65+
Interlocked.CompareExchange(ref _taskCompletionSource, new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously), currentTcs) == currentTcs)
66+
{
67+
return;
68+
}
11569
}
116-
117-
Volatile.Write(ref _isSet, false);
118-
_valueTaskSource.Reset();
11970
}
120-
121-
void IValueTaskSource.GetResult(short token)
122-
{
123-
if (token != _valueTaskSource.Version)
124-
{
125-
ThrowIncorrectTokenException();
126-
}
127-
128-
_valueTaskSource.GetResult(token);
129-
}
130-
131-
ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
132-
{
133-
if (token != _valueTaskSource.Version)
134-
{
135-
ThrowIncorrectTokenException();
136-
}
137-
138-
return _valueTaskSource.GetStatus(token);
139-
}
140-
141-
void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
142-
{
143-
if (token != _valueTaskSource.Version)
144-
{
145-
ThrowIncorrectTokenException();
146-
}
147-
148-
_valueTaskSource.OnCompleted(continuation, state, token, flags);
149-
}
150-
151-
[DoesNotReturn]
152-
static void ThrowIncorrectTokenException() =>
153-
throw new InvalidOperationException("ValueTask cannot be awaited multiple times.");
15471
}
15572
}

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,11 +229,11 @@ void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers
229229
}
230230

231231
[MethodImpl(MethodImplOptions.AggressiveInlining)]
232-
private ValueTask MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
232+
private Task MaybeEnforceFlowControlAsync(CancellationToken cancellationToken)
233233
{
234234
if (_flowControlBlock.IsSet)
235235
{
236-
return default;
236+
return Task.CompletedTask;
237237
}
238238

239239
return _flowControlBlock.WaitAsync(cancellationToken);

0 commit comments

Comments
 (0)