Skip to content

Commit 40914e3

Browse files
authored
Merge pull request #459 from serverlessworkflow/fix-try-catch-context-handling
Fix try catch context handling
2 parents 7fd2983 + 9951627 commit 40914e3

File tree

16 files changed

+122
-67
lines changed

16 files changed

+122
-67
lines changed

src/api/Synapse.Api.Http/ClusterResourceController.cs

+16-8
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,16 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string? labelSel
109109
this.Response.Headers.CacheControl = "no-cache";
110110
this.Response.Headers.Connection = "keep-alive";
111111
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
112-
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
112+
try
113113
{
114-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
115-
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
116-
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
114+
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
115+
{
116+
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
117+
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
118+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
119+
}
117120
}
121+
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
118122
return this.Ok();
119123
}
120124

@@ -149,12 +153,16 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, Ca
149153
this.Response.Headers.CacheControl = "no-cache";
150154
this.Response.Headers.Connection = "keep-alive";
151155
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
152-
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
156+
try
153157
{
154-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
155-
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
156-
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
158+
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
159+
{
160+
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
161+
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
162+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
163+
}
157164
}
165+
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
158166
return this.Ok();
159167
}
160168

src/api/Synapse.Api.Http/NamespacedResourceController.cs

+16-10
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14-
using Neuroglia.Data.Infrastructure.ResourceOriented;
15-
1614
namespace Synapse.Api.Http;
1715

1816
/// <summary>
@@ -164,12 +162,16 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string @namespac
164162
this.Response.Headers.CacheControl = "no-cache";
165163
this.Response.Headers.Connection = "keep-alive";
166164
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
167-
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
165+
try
168166
{
169-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
170-
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
171-
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
167+
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
168+
{
169+
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
170+
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
171+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
172+
}
172173
}
174+
catch (Exception ex) when(ex is TaskCanceledException || ex is OperationCanceledException) { }
173175
return this.Ok();
174176
}
175177

@@ -206,12 +208,16 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, st
206208
this.Response.Headers.CacheControl = "no-cache";
207209
this.Response.Headers.Connection = "keep-alive";
208210
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
209-
await foreach(var e in response.Data!.WithCancellation(cancellationToken))
211+
try
210212
{
211-
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
212-
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
213-
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
213+
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
214+
{
215+
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
216+
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
217+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
218+
}
214219
}
220+
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
215221
return this.Ok();
216222
}
217223

src/api/Synapse.Api.Http/Synapse.Api.Http.csproj

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@
4343
</ItemGroup>
4444

4545
<ItemGroup>
46-
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.16.0" />
47-
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.16.0" />
48-
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.0.0" />
46+
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.16.2" />
47+
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.16.2" />
48+
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.1.0" />
4949
</ItemGroup>
5050

5151
<ItemGroup>

src/api/Synapse.Api.Server/Synapse.Api.Server.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
<PackageReference Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="9.0.0" />
3636
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="9.0.0" />
3737
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
38-
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.0.0" />
38+
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.1.0" />
3939
</ItemGroup>
4040

4141
<ItemGroup>

src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj

+6-6
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@
4444

4545
<ItemGroup>
4646
<PackageReference Include="IdentityModel" Version="7.0.0" />
47-
<PackageReference Include="Microsoft.IdentityModel.JsonWebTokens" Version="8.2.0" />
48-
<PackageReference Include="Neuroglia.Data.Expressions.Abstractions" Version="4.16.0" />
49-
<PackageReference Include="Neuroglia.Data.Infrastructure.Redis" Version="4.16.0" />
50-
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.16.0" />
51-
<PackageReference Include="Neuroglia.Mediation" Version="4.16.0" />
52-
<PackageReference Include="Neuroglia.Plugins" Version="4.16.0" />
47+
<PackageReference Include="Microsoft.IdentityModel.JsonWebTokens" Version="8.3.0" />
48+
<PackageReference Include="Neuroglia.Data.Expressions.Abstractions" Version="4.16.2" />
49+
<PackageReference Include="Neuroglia.Data.Infrastructure.Redis" Version="4.16.2" />
50+
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.16.2" />
51+
<PackageReference Include="Neuroglia.Mediation" Version="4.16.2" />
52+
<PackageReference Include="Neuroglia.Plugins" Version="4.16.2" />
5353
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
5454
</ItemGroup>
5555

src/core/Synapse.Core/Synapse.Core.csproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@
6666
<ItemGroup>
6767
<PackageReference Include="Docker.DotNet" Version="3.125.15" />
6868
<PackageReference Include="KubernetesClient" Version="15.0.1" />
69-
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.16.0" />
70-
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.16.0" />
69+
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.16.2" />
70+
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.16.2" />
7171
<PackageReference Include="Semver" Version="3.0.0" />
7272
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha5.1" />
7373
</ItemGroup>

src/correlator/Synapse.Correlator/Synapse.Correlator.csproj

+8-8
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@
3636
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="9.0.0" />
3737
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.0" />
3838
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
39-
<PackageReference Include="Neuroglia.Data.Expressions.JavaScript" Version="4.16.0" />
40-
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.16.0" />
41-
<PackageReference Include="Neuroglia.Eventing.CloudEvents.AspNetCore" Version="4.16.0" />
42-
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.16.0" />
43-
<PackageReference Include="Neuroglia.Eventing.CloudEvents.Infrastructure" Version="4.16.0" />
44-
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.16.0" />
45-
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.0.0" />
46-
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.0.0" />
39+
<PackageReference Include="Neuroglia.Data.Expressions.JavaScript" Version="4.16.2" />
40+
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.16.2" />
41+
<PackageReference Include="Neuroglia.Eventing.CloudEvents.AspNetCore" Version="4.16.2" />
42+
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.16.2" />
43+
<PackageReference Include="Neuroglia.Eventing.CloudEvents.Infrastructure" Version="4.16.2" />
44+
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.16.2" />
45+
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.1.0" />
46+
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.1.0" />
4747
</ItemGroup>
4848

4949
<ItemGroup>

src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
</PropertyGroup>
1111

1212
<ItemGroup>
13-
<PackageReference Include="Blazor.Bootstrap" Version="3.1.1" />
13+
<PackageReference Include="Blazor.Bootstrap" Version="3.2.0" />
1414
<PackageReference Include="BlazorMonaco" Version="3.2.0" />
1515
<PackageReference Include="IdentityModel" Version="7.0.0" />
1616
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly" Version="9.0.0" />
1717
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Authentication" Version="9.0.0" />
1818
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.DevServer" Version="9.0.0" PrivateAssets="all" />
1919
<PackageReference Include="moment.net" Version="1.3.4" />
20-
<PackageReference Include="Neuroglia.Blazor.Dagre" Version="4.16.0" />
20+
<PackageReference Include="Neuroglia.Blazor.Dagre" Version="4.16.2" />
2121
</ItemGroup>
2222

2323
<ItemGroup>

src/operator/Synapse.Operator/Services/WorkflowController.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public override async Task StartAsync(CancellationToken cancellationToken)
5555
this.Operator!.Select(b => b.Resource.Spec.Selector).DistinctUntilChanged().SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken);
5656
this.Where(e => e.Type == ResourceWatchEventType.Updated)
5757
.Select(e => new { Workflow = e.Resource, e.Resource.Spec.Versions })
58-
.DistinctUntilChanged()
58+
.DistinctUntilChanged(s => s.Versions)
5959
.Scan((Previous: (EquatableList<WorkflowDefinition>)null!, Current: (EquatableList<WorkflowDefinition>)null!, Workflow: (Workflow)null!), (accumulator, current) => (accumulator.Current, current.Versions, current.Workflow))
6060
.SubscribeAsync(async value => await this.OnWorkflowVersionChangedAsync(value.Workflow, value.Previous, value.Current).ConfigureAwait(false), cancellationToken: cancellationToken);
6161
await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false);
@@ -190,6 +190,7 @@ protected virtual async Task OnWorkflowVersionChangedAsync(Workflow workflow, Eq
190190
if (workflow.Metadata.Labels == null || !workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out _)) if (!await this.TryClaimAsync(workflow, this.CancellationTokenSource.Token).ConfigureAwait(false)) return;
191191
if (workflow.Metadata.Labels?[SynapseDefaults.Resources.Labels.Operator] != this.Operator.Resource.GetQualifiedName()) return;
192192
var diffPatch = JsonPatchUtility.CreateJsonPatchFromDiff(previous, current);
193+
if (diffPatch.Operations.Count < 1) return;
193194
var operation = diffPatch.Operations[0].Op;
194195
if (this.Schedulers.TryRemove(workflow.GetQualifiedName(), out var scheduler)) await scheduler.DisposeAsync().ConfigureAwait(false);
195196
if (operation == OperationType.Remove) return;

src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs

+49-11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14+
using Neuroglia.Data.Infrastructure.Services;
15+
1416
namespace Synapse.Operator.Services;
1517

1618
/// <summary>
@@ -21,7 +23,8 @@ namespace Synapse.Operator.Services;
2123
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
2224
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
2325
/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
24-
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController)
26+
/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
27+
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IRepository<Document, string> documents)
2528
: ResourceController<WorkflowInstance>(loggerFactory, controllerOptions, repository)
2629
{
2730

@@ -35,6 +38,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
3538
/// </summary>
3639
protected IResourceMonitor<Resources.Operator> Operator => operatorController.Operator;
3740

41+
/// <summary>
42+
/// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
43+
/// </summary>
44+
protected IRepository<Document, string> Documents => documents;
45+
3846
/// <summary>
3947
/// Gets a <see cref="ConcurrentDictionary{TKey, TValue}"/> that contains current <see cref="WorkflowInstanceHandler"/>es
4048
/// </summary>
@@ -139,24 +147,54 @@ public override async Task StopAsync(CancellationToken cancellationToken)
139147
/// <inheritdoc/>
140148
protected override async Task OnResourceCreatedAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default)
141149
{
142-
await base.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
143-
if (!await this.TryClaimAsync(workflowInstance, cancellationToken).ConfigureAwait(false)) return;
144-
var handler = await this.CreateWorkflowInstanceHandlerAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
145-
await handler.HandleAsync(cancellationToken).ConfigureAwait(false);
150+
try
151+
{
152+
await base.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
153+
if (!await this.TryClaimAsync(workflowInstance, cancellationToken).ConfigureAwait(false)) return;
154+
var handler = await this.CreateWorkflowInstanceHandlerAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
155+
await handler.HandleAsync(cancellationToken).ConfigureAwait(false);
156+
}
157+
catch(Exception ex)
158+
{
159+
this.Logger.LogError("An error occured while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
160+
}
146161
}
147162

148163
/// <inheritdoc/>
149164
protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default)
150165
{
151-
await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
152-
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
153-
var selectors = new LabelSelector[]
166+
try
154167
{
168+
await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
169+
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
170+
var selectors = new LabelSelector[]
171+
{
155172
new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
156-
};
157-
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
173+
};
174+
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
175+
{
176+
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
177+
}
178+
if (workflowInstance.Status != null)
179+
{
180+
var documentReferences = new List<string>();
181+
if (!string.IsNullOrWhiteSpace(workflowInstance.Status.ContextReference)) documentReferences.Add(workflowInstance.Status.ContextReference);
182+
if (!string.IsNullOrWhiteSpace(workflowInstance.Status.OutputReference)) documentReferences.Add(workflowInstance.Status.OutputReference);
183+
if (workflowInstance.Status.Tasks != null)
184+
{
185+
foreach (var task in workflowInstance.Status.Tasks)
186+
{
187+
if (!string.IsNullOrWhiteSpace(task.ContextReference)) documentReferences.Add(task.ContextReference);
188+
if (!string.IsNullOrWhiteSpace(task.InputReference)) documentReferences.Add(task.InputReference);
189+
if (!string.IsNullOrWhiteSpace(task.OutputReference)) documentReferences.Add(task.OutputReference);
190+
}
191+
}
192+
foreach (var documentReference in documentReferences.Distinct()) await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
193+
}
194+
}
195+
catch(Exception ex)
158196
{
159-
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
197+
this.Logger.LogError("An error occured while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
160198
}
161199
}
162200

src/operator/Synapse.Operator/appsettings.Development.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
},
1717
"Runtime": {
1818
"Native": {
19-
"Directory": "..\\..\\..\\..\\..\\runner\\Synapse.Runner\\bin\\Debug\\net8.0\\",
19+
"Directory": "..\\..\\..\\..\\..\\runner\\Synapse.Runner\\bin\\Debug\\net9.0\\",
2020
"Executable": "Synapse.Runner.exe"
2121
}
2222
}

0 commit comments

Comments
 (0)