Skip to content

Commit 83132aa

Browse files
committed
feat(Runner): Implemented the ability to run processes asynchronously without awaiting their completion
Signed-off-by: Charles d'Avernas <[email protected]>
1 parent d4c4ca7 commit 83132aa

File tree

11 files changed

+53
-32
lines changed

11 files changed

+53
-32
lines changed

src/api/Synapse.Api.Client.Http/Synapse.Api.Client.Http.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
<ItemGroup>
4545
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.10" />
46-
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5" />
46+
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
4747
<PackageReference Include="System.Reactive" Version="6.0.1" />
4848
</ItemGroup>
4949

src/cli/Synapse.Cli/Synapse.Cli.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" />
3434
<PackageReference Include="moment.net" Version="1.3.4" />
3535
<PackageReference Include="NetEscapades.Configuration.Yaml" Version="3.1.0" />
36-
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5" />
36+
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
3737
<PackageReference Include="Spectre.Console" Version="0.49.1" />
3838
<PackageReference Include="System.CommandLine.NamingConventionBinder" Version="2.0.0-beta4.22272.1" />
3939
</ItemGroup>

src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.15.8" />
5151
<PackageReference Include="Neuroglia.Mediation" Version="4.15.8" />
5252
<PackageReference Include="Neuroglia.Plugins" Version="4.15.8" />
53-
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5" />
53+
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
5454
</ItemGroup>
5555

5656
<ItemGroup>

src/core/Synapse.Core/Synapse.Core.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.15.8" />
7070
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.15.8" />
7171
<PackageReference Include="Semver" Version="2.3.0" />
72-
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha5" />
72+
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha5.1" />
7373
</ItemGroup>
7474

7575
</Project>

src/runner/Synapse.Runner/Services/Executors/ContainerProcessExecutor.cs

+5
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
6262
try
6363
{
6464
await this.Container!.StartAsync(cancellationToken).ConfigureAwait(false);
65+
if (this.Task.Definition.Run.Await != false)
66+
{
67+
await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
68+
return;
69+
}
6570
await this.Container.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
6671
var standardOutput = (this.Container.StandardOutput == null ? null : await this.Container.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false))?.Trim();
6772
if (this.Options.Containers.Platform == ContainerPlatform.Docker) standardOutput = standardOutput?[8..];

src/runner/Synapse.Runner/Services/Executors/ScriptProcessExecutor.cs

+7-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
6868
var environment = this.ProcessDefinition.Environment == null
6969
? null
7070
: await this.ProcessDefinition.Environment.ToAsyncEnumerable().ToDictionaryAwaitAsync(kvp => ValueTask.FromResult(kvp.Key), async kvp => (await this.EvaluateAndSerializeAsync(kvp.Value, cancellationToken).ConfigureAwait(false))!, cancellationToken).ConfigureAwait(false);
71-
using var process = await executor.ExecuteAsync(script, arguments, environment, cancellationToken).ConfigureAwait(false);
71+
var process = await executor.ExecuteAsync(script, arguments, environment, cancellationToken).ConfigureAwait(false);
72+
if (this.Task.Definition.Run.Await != false)
73+
{
74+
await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
75+
return;
76+
}
7277
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
7378
var rawOutput = (await process.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim();
7479
var errorMessage = (await process.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim();
@@ -81,6 +86,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
8186
Detail = errorMessage,
8287
Instance = this.Task.Instance.Reference
8388
}, cancellationToken).ConfigureAwait(false);
89+
process.Dispose();
8490
}
8591

8692
/// <summary>

src/runner/Synapse.Runner/Services/Executors/ShellProcessExecutor.cs

+7-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
4949
RedirectStandardError = true,
5050
CreateNoWindow = true
5151
};
52-
using var process = Process.Start(startInfo) ?? throw new NullReferenceException($"Failed to create the shell process defined at '{this.Task.Instance.Reference}'");
52+
var process = Process.Start(startInfo) ?? throw new NullReferenceException($"Failed to create the shell process defined at '{this.Task.Instance.Reference}'");
53+
if (this.Task.Definition.Run.Await != false)
54+
{
55+
await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
56+
return;
57+
}
5358
await process.WaitForExitAsync(cancellationToken).ConfigureAwait(false);
5459
var rawOutput = (await process.StandardOutput.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim();
5560
var errorMessage = (await process.StandardError.ReadToEndAsync(cancellationToken).ConfigureAwait(false)).Trim();
@@ -62,6 +67,7 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
6267
Detail = errorMessage,
6368
Instance = this.Task.Instance.Reference
6469
}, cancellationToken).ConfigureAwait(false);
70+
process.Dispose();
6571
}
6672

6773
}

src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs

+26-22
Original file line numberDiff line numberDiff line change
@@ -111,38 +111,42 @@ await this.SetErrorAsync(new()
111111
};
112112
this.Subflow = await this.Api.WorkflowInstances.CreateAsync(this.Subflow, cancellationToken).ConfigureAwait(false);
113113
}
114-
await foreach(var watchEvent in this.Api.WorkflowInstances.MonitorAsync(this.Subflow.GetName(), this.Subflow.GetNamespace()!, cancellationToken))
114+
if (this.Task.Definition.Run.Await == false) await this.SetResultAsync(new(), this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
115+
else
115116
{
116-
switch (watchEvent.Resource.Status?.Phase)
117+
await foreach (var watchEvent in this.Api.WorkflowInstances.MonitorAsync(this.Subflow.GetName(), this.Subflow.GetNamespace()!, cancellationToken))
117118
{
118-
case WorkflowInstanceStatusPhase.Cancelled:
119-
if (!this.Cancelling)
120-
{
121-
await this.SetErrorAsync(new()
119+
switch (watchEvent.Resource.Status?.Phase)
120+
{
121+
case WorkflowInstanceStatusPhase.Cancelled:
122+
if (!this.Cancelling)
122123
{
123-
Type = ErrorType.Runtime,
124-
Status = ErrorStatus.Runtime,
125-
Title = ErrorTitle.Runtime,
126-
Detail = $"The execution of workflow instance '{this.Subflow.GetQualifiedName()}' has been cancelled"
127-
}, cancellationToken).ConfigureAwait(false);
128-
}
129-
break;
130-
case WorkflowInstanceStatusPhase.Faulted:
131-
await this.SetErrorAsync(watchEvent.Resource.Status.Error!, cancellationToken).ConfigureAwait(false);
132-
return;
133-
case WorkflowInstanceStatusPhase.Completed:
134-
var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content;
135-
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
136-
return;
124+
await this.SetErrorAsync(new()
125+
{
126+
Type = ErrorType.Runtime,
127+
Status = ErrorStatus.Runtime,
128+
Title = ErrorTitle.Runtime,
129+
Detail = $"The execution of workflow instance '{this.Subflow.GetQualifiedName()}' has been cancelled"
130+
}, cancellationToken).ConfigureAwait(false);
131+
}
132+
break;
133+
case WorkflowInstanceStatusPhase.Faulted:
134+
await this.SetErrorAsync(watchEvent.Resource.Status.Error!, cancellationToken).ConfigureAwait(false);
135+
return;
136+
case WorkflowInstanceStatusPhase.Completed:
137+
var output = string.IsNullOrWhiteSpace(watchEvent.Resource.Status?.OutputReference) ? null : (await this.Api.Documents.GetAsync(watchEvent.Resource.Status.OutputReference, cancellationToken).ConfigureAwait(false)).Content;
138+
await this.SetResultAsync(output, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
139+
return;
140+
}
141+
if (this.Cancelling) break;
137142
}
138-
if (this.Cancelling) break;
139143
}
140144
}
141145

142146
/// <inheritdoc/>
143147
public override async Task CancelAsync(CancellationToken cancellationToken = default)
144148
{
145-
if(this.Subflow != null)
149+
if (this.Subflow != null && this.Task.Definition.Run.Await != false)
146150
{
147151
try
148152
{

src/runner/Synapse.Runner/Synapse.Runner.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
<ItemGroup>
5252
<PackageReference Include="Docker.DotNet" Version="3.125.15" />
5353
<PackageReference Include="DynamicGrpc" Version="1.4.0" />
54-
<PackageReference Include="Google.Protobuf" Version="3.28.2" />
54+
<PackageReference Include="Google.Protobuf" Version="3.28.3" />
5555
<PackageReference Include="Grpc.Core" Version="2.46.6" />
5656
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="8.0.10" />
5757
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />

tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<PackageReference Include="FluentAssertions" Version="6.12.1" />
1818
<PackageReference Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.10" />
1919
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
20-
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5" />
20+
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5.1" />
2121
<PackageReference Include="Testcontainers" Version="3.10.0" />
2222
<PackageReference Include="xunit" Version="2.9.2" />
2323
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">

tests/Synapse.UnitTests/Synapse.UnitTests.csproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.15.8" />
2323
<PackageReference Include="Neuroglia.Data.Infrastructure.Memory" Version="4.15.8" />
2424
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.15.8" />
25-
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5" />
26-
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5" />
25+
<PackageReference Include="ServerlessWorkflow.Sdk.Builders" Version="1.0.0-alpha5.1" />
26+
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
2727
<PackageReference Include="Testcontainers" Version="3.10.0" />
2828
<PackageReference Include="xunit" Version="2.9.2" />
2929
<PackageReference Include="Xunit.Gherkin.Quick" Version="4.5.0" />

0 commit comments

Comments
 (0)