Skip to content

Commit 0471a58

Browse files
committed
Consolidate and simplify pooling for simple cases. Probably many opportunities still...
1 parent 1971702 commit 0471a58

File tree

9 files changed

+473
-183
lines changed

9 files changed

+473
-183
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
// //-----------------------------------------------------------------------
2+
// // <copyright file="UnfoldAsyncBenchmarks.cs" company="Akka.NET Project">
3+
// // Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
4+
// // Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// // </copyright>
6+
// //-----------------------------------------------------------------------
7+
8+
using System.Threading.Channels;
9+
using System.Threading.Tasks;
10+
using Akka.Actor;
11+
using Akka.Benchmarks.Configurations;
12+
using Akka.Streams;
13+
using Akka.Streams.Dsl;
14+
using BenchmarkDotNet.Attributes;
15+
16+
namespace Akka.Benchmarks.Streams;
17+
18+
[Config(typeof(MicroBenchmarkConfig))]
19+
public class UnfoldAsyncBenchmarks
20+
{
21+
public struct IntOrCompletion
22+
{
23+
public readonly int IntValue;
24+
public readonly TaskCompletionSource? Completion;
25+
26+
public IntOrCompletion(int intValue, TaskCompletionSource? completion)
27+
{
28+
IntValue = intValue;
29+
Completion = completion;
30+
}
31+
}
32+
private ActorSystem system;
33+
private ActorMaterializer materializer;
34+
35+
private IRunnableGraph<Task> simpleGraph;
36+
private Task<Done> selectAsyncStub;
37+
private Channel<IntOrCompletion> asyncNoYieldCh;
38+
private Task<Done> selectValueTaskAsyncStub;
39+
private Channel<IntOrCompletion> vtAsyncCh;
40+
private Task<Done> unfoldAsyncSyncStub;
41+
private Task<Done> selectAsyncValueTaskSyncStub;
42+
private Channel<IntOrCompletion> asyncYieldCh;
43+
private Channel<IntOrCompletion> vtAsyncYieldCh;
44+
45+
[GlobalSetup]
46+
public void Setup()
47+
{
48+
system = ActorSystem.Create("system");
49+
materializer = system.Materializer();
50+
asyncNoYieldCh = Channel.CreateUnbounded<IntOrCompletion>();
51+
52+
asyncYieldCh = Channel.CreateUnbounded<IntOrCompletion>();
53+
54+
vtAsyncYieldCh = Channel.CreateUnbounded<IntOrCompletion>();
55+
56+
unfoldAsyncSyncStub = Source.UnfoldAsync<ChannelReader<IntOrCompletion>,int>(asyncYieldCh.Reader, async r =>
57+
{
58+
var i = await r.ReadAsync();
59+
if (i.Completion != null)
60+
{
61+
i.Completion.TrySetResult();
62+
return (r, -1);
63+
}
64+
else
65+
{
66+
return (r, i.IntValue);
67+
}
68+
})
69+
.RunWith(Sink.Ignore<int>(), materializer);
70+
71+
selectAsyncValueTaskSyncStub = Source.UnfoldValueTaskAsync<ChannelReader<IntOrCompletion>,int>(vtAsyncYieldCh.Reader, async r =>
72+
{
73+
var i = await r.ReadAsync();
74+
if (i.Completion != null)
75+
{
76+
i.Completion.TrySetResult();
77+
return (r, -1);
78+
}
79+
else
80+
{
81+
return (r, i.IntValue);
82+
}
83+
})
84+
.RunWith(Sink.Ignore<int>(), materializer);
85+
selectAsyncStub = Source.UnfoldAsync<ChannelReader<IntOrCompletion>,int>(asyncNoYieldCh.Reader,async r =>
86+
{
87+
await Task.Yield();
88+
var a = await r.ReadAsync();
89+
if (a.Completion != null)
90+
{
91+
a.Completion.TrySetResult();
92+
return (r, -1);
93+
}
94+
else
95+
{
96+
//await Task.Yield();
97+
// await Task.Delay(0);
98+
return (r, a.IntValue);
99+
}
100+
}).RunWith(Sink.Ignore<int>(), materializer);
101+
vtAsyncCh = Channel.CreateUnbounded<IntOrCompletion>();
102+
int vta = 0;
103+
selectValueTaskAsyncStub = Source.UnfoldValueTaskAsync<ChannelReader<IntOrCompletion>,int>(vtAsyncCh.Reader,async r =>
104+
{
105+
await Task.Yield();
106+
var a = await r.ReadAsync();
107+
if (a.Completion != null)
108+
{
109+
a.Completion.TrySetResult();
110+
return (r, -1);
111+
}
112+
else
113+
{
114+
//await Task.Yield();
115+
//await Task.Delay(0);
116+
return (r, a.IntValue);
117+
}
118+
}).RunWith(Sink.Ignore<int>(), materializer);
119+
}
120+
121+
[GlobalCleanup]
122+
public void Cleanup()
123+
{
124+
materializer.Dispose();
125+
system.Dispose();
126+
}
127+
128+
[Benchmark]
129+
public async Task UnfoldAsyncNoYield()
130+
{
131+
var completion = new TaskCompletionSource(TaskCreationOptions
132+
.RunContinuationsAsynchronously);
133+
for (int i = 0; i < 100; i++)
134+
{
135+
asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
136+
}
137+
138+
asyncNoYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
139+
await completion.Task;
140+
141+
}
142+
143+
144+
[Benchmark]
145+
public async Task UnfoldValueTaskAsyncNoYield()
146+
{
147+
var completion = new TaskCompletionSource(TaskCreationOptions
148+
.RunContinuationsAsynchronously);
149+
for (int i = 0; i < 100; i++)
150+
{
151+
vtAsyncCh.Writer.TryWrite(new IntOrCompletion(i, null));
152+
}
153+
154+
vtAsyncCh.Writer.TryWrite(new IntOrCompletion(0, completion));
155+
await completion.Task;
156+
157+
}
158+
159+
[Benchmark]
160+
public async Task UnfoldAsyncWithYield()
161+
{
162+
var completion = new TaskCompletionSource(TaskCreationOptions
163+
.RunContinuationsAsynchronously);
164+
for (int i = 0; i < 100; i++)
165+
{
166+
asyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
167+
await Task.Delay(1);
168+
}
169+
170+
asyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
171+
await completion.Task;
172+
173+
}
174+
175+
176+
[Benchmark]
177+
public async Task UnfoldValueTaskAsyncWithYield()
178+
{
179+
var completion = new TaskCompletionSource(TaskCreationOptions
180+
.RunContinuationsAsynchronously);
181+
for (int i = 0; i < 100; i++)
182+
{
183+
vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(i, null));
184+
await Task.Delay(1);
185+
}
186+
187+
vtAsyncYieldCh.Writer.TryWrite(new IntOrCompletion(0, completion));
188+
await completion.Task;
189+
190+
}
191+
}

src/core/Akka.Streams/Dsl/Source.cs

+3
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,9 @@ public static Source<TElem, NotUsed> Unfold<TState, TElem>(TState state, Func<TS
789789
public static Source<TElem, NotUsed> UnfoldAsync<TState, TElem>(TState state, Func<TState, Task<Option<(TState, TElem)>>> unfoldAsync)
790790
=> FromGraph(new UnfoldAsync<TState, TElem>(state, unfoldAsync)).WithAttributes(DefaultAttributes.UnfoldAsync);
791791

792+
793+
public static Source<TElem, NotUsed> UnfoldValueTaskAsync<TState, TElem>(TState state, Func<TState, ValueTask<Option<(TState, TElem)>>> unfoldAsync)
794+
=> FromGraph(new UnfoldValueTaskAsync<TState, TElem>(state, unfoldAsync)).WithAttributes(DefaultAttributes.UnfoldValueTaskAsync);
792795
/// <summary>
793796
/// Simpler <see cref="Unfold{TState,TElem}"/>, for infinite sequences.
794797
/// </summary>

0 commit comments

Comments
 (0)