Skip to content

Commit 1ca6f47

Browse files
committed
add demo
1 parent 88e93e6 commit 1ca6f47

File tree

8 files changed

+58
-23
lines changed

8 files changed

+58
-23
lines changed

README.md

+19
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
This repository demonstrates how to use `IAsyncEnumerable` and `System.Linq.Async` to build pipelines in C#.
77

8+
9+
> [!IMPORTANT]
10+
> This repository doesn't cover all the possible concerns such as error handling, cancellation, backpressure, performance, etc. It's just a simple demonstration of how to build pipelines with `IAsyncEnumerable` and `System.Linq.Async`.
11+
812
```bash
913
dotnet example --list
1014
```
@@ -19,3 +23,18 @@ dotnet example --list
1923
│ TextSummarizationAndAggregationPipeline │ Demonstrates how to build custom async-enumerable operators. │
2024
╰─────────────────────────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────╯
2125
```
26+
## Demo: CalculateWordCountPipeline
27+
28+
<video src="https://github.com/user-attachments/assets/84c1e8a8-996d-4960-9b39-20e6bd1101a9" controls="controls"></video>
29+
30+
## Demo: CalculateWordCountBatchPipeline
31+
32+
<video src="https://github.com/user-attachments/assets/56db32bd-a7e9-41ec-8706-eaf876750bb6" controls="controls"></video>
33+
34+
## Demo: CalculateWordCountFileWatcherPipeline
35+
36+
<video src="https://github.com/user-attachments/assets/96cc653d-8b42-4779-b2f2-fce804f0160b" controls="controls"></video>
37+
38+
## Demo: TextSummarizationAndAggregationPipeline
39+
40+
<video src="https://github.com/user-attachments/assets/42c6eb97-7a11-4b89-857e-1ffb8e70073c" controls="controls"></video>

assets/demo1.mp4

22 MB
Binary file not shown.

assets/demo2.mp4

27.6 MB
Binary file not shown.

assets/demo3.mp4

18.3 MB
Binary file not shown.

assets/demo4.mp4

59.5 MB
Binary file not shown.

examples/CalculateWordCountFileWatcherPipeline/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
var fileWatcher = CreateFileObservable(path);
77

88
var pipeline = fileWatcher
9-
.TakeUntil(DateTimeOffset.Now.AddMinutes(1))
9+
.TakeUntil(DateTimeOffset.Now.AddSeconds(15))
1010
.ToAsyncEnumerable()
1111
.SelectAwait(ReadFile)
1212
.Where(IsValidFileForProcessing)
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
using Microsoft.Extensions.DependencyInjection;
22
using Microsoft.Extensions.Hosting;
3+
using Microsoft.Extensions.Logging;
34
using Microsoft.SemanticKernel;
4-
using Microsoft.SemanticKernel.Connectors.OpenAI;
55
using Shared;
6+
using Spectre.Console;
67
using static Shared.Steps;
78

8-
var (kernel, summarizationFunction) = Init();
9+
var kernel = Init();
910
var path = Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "Data");
1011

1112
var pipeline = Directory
@@ -15,14 +16,18 @@
1516
.SelectAwait(ReadFile)
1617
.Where(IsValidFileForProcessing)
1718
.SelectAwait(Summarize)
18-
.WriteResultToFile(path: Path.Combine(path, "summaries.txt"))
19-
.ForEachAsync(x => Console.WriteLine($"Processed {x.Name}"));
19+
.WriteResultToFile(path: Path.Combine(Path.GetTempPath(), "summaries.txt"))
20+
.ForEachAsync(x => AnsiConsole.MarkupLine($"Processed [green]{x.Name}[/]"));
2021

2122
await pipeline;
2223

23-
static (Kernel kernel, KernelFunction summarizationFunction) Init()
24+
static Kernel Init()
2425
{
25-
var builder = Host.CreateApplicationBuilder();
26+
var builder = Host.CreateApplicationBuilder(
27+
new HostApplicationBuilderSettings { EnvironmentName = Environments.Development }
28+
);
29+
builder.Services.AddLogging(builder => builder.SetMinimumLevel(LogLevel.None));
30+
2631
var endpoint = builder.Configuration["AZURE_OPENAI_ENDPOINT"]!;
2732
var deployment = builder.Configuration["AZURE_OPENAI_GPT_NAME"]!;
2833
var key = builder.Configuration["AZURE_OPENAI_KEY"]!;
@@ -33,21 +38,20 @@
3338
var services = builder.Build().Services;
3439

3540
var kernel = services.GetRequiredService<Kernel>();
36-
var prompt = """
37-
Please summarize the the following text in 20 words or less:
38-
${input}
39-
""";
40-
var summarizationFunction = kernel.CreateFunctionFromPrompt(prompt);
4141

42-
return (kernel, summarizationFunction);
42+
return kernel;
4343
}
4444

4545
async ValueTask<SummarizationPayload> Summarize(FilePayload file)
4646
{
47-
var result = await summarizationFunction.InvokeAsync(
48-
kernel,
49-
new KernelArguments(new OpenAIPromptExecutionSettings() { MaxTokens = 400 }) { ["input"] = file.Content }
50-
);
47+
var prompt = """
48+
{{$input}}
49+
Please summarize the content above in 20 words or less:
50+
51+
The output format should be: [title]: [summary]
52+
""";
53+
54+
var result = await kernel.InvokePromptAsync(prompt, new KernelArguments() { ["input"] = file.Content });
5155

5256
return new(file.Name, result.ToString());
5357
}
@@ -61,7 +65,7 @@ string path
6165
{
6266
const int batchSize = 10;
6367

64-
using var streamWriter = new StreamWriter(path);
68+
using var streamWriter = new StreamWriter(path, append: true);
6569

6670
await foreach (var batch in values.Buffer(batchSize))
6771
{
@@ -72,17 +76,28 @@ string path
7276
yield return value;
7377
}
7478

75-
streamWriter.Flush();
79+
await streamWriter.FlushAsync();
7680
}
81+
82+
AnsiConsole.MarkupLine($"Results written to [green]{path}[/]");
7783
}
7884

79-
public static async IAsyncEnumerable<T> ReportProgress<T>(this IAsyncEnumerable<T> values)
85+
public static async IAsyncEnumerable<string> ReportProgress(this IAsyncEnumerable<string> values)
8086
{
8187
var totalCount = await values.CountAsync();
8288

83-
await foreach (var value in values)
89+
await foreach (var (value, index) in values.Select((value, index) => (value, index)))
8490
{
8591
yield return value;
92+
93+
AnsiConsole
94+
.Progress()
95+
.Start(ctx =>
96+
{
97+
var task = ctx.AddTask($"Processing - {Path.GetFileName(value)}", true, totalCount);
98+
task.Increment(index + 1);
99+
task.StopTask();
100+
});
86101
}
87102
}
88103
}

examples/TextSummarizationAndAggregationPipeline/TextSummarizationAndAggregationPipeline.csproj

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
<TargetFramework>net8.0</TargetFramework>
66
<ImplicitUsings>enable</ImplicitUsings>
77
<Nullable>enable</Nullable>
8-
<ExampleDescription>Demonstrates how to build custom async-enumerable operators.</ExampleDescription>
8+
<ExampleDescription>Demonstrates how to build custom async-enumerable operators</ExampleDescription>
99
<ExampleOrder>4</ExampleOrder>
10-
<UserSecretsId>3f392587-2601-400b-84a6-b7340d1436ef</UserSecretsId>
10+
<UserSecretsId>e789b935-ac65-4262-a2e3-dcca0f6e9cfe</UserSecretsId>
1111
</PropertyGroup>
1212

1313
<ItemGroup>
@@ -26,6 +26,7 @@
2626
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="8.0.0" />
2727
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
2828
<PackageReference Include="Microsoft.SemanticKernel" Version="1.17.2" />
29+
<PackageReference Include="Spectre.Console" Version="0.49.1" />
2930
</ItemGroup>
3031

3132
</Project>

0 commit comments

Comments
 (0)