Skip to content

Commit e149870

Browse files
committed
fix(Runner): Fixed the ConnectedWorkflowExecutionContext to remove handled correlation contexts from the WorkflowInstance they are associated to
fix(Runner): Fixed the `ConnectedWorkflowExecutionContext` to populate `WorkfowInstance` correlation keys set by handled correlation contexts fix(Runner): Fixed a feature-breaking bug with the `AsyncApiCallExecutor` disposing inline of the IAsyncApiSubscribeOperationResult, thus irreversibly breaking the message stream fix(Runner): Fixed the `AsyncApiCallExecutor`, which was not properly evaluating `while` and `until` message consumption conditions fix(Runner): Fixed the `AsyncApiCallExecutor`, which was not passing the `item` and `index` arguments to the `output.as` and `export.as` expressions Signed-off-by: Charles d'Avernas <[email protected]>
1 parent 6e52198 commit e149870

File tree

5 files changed

+71
-26
lines changed

5 files changed

+71
-26
lines changed

src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
using Synapse.Api.Http.Controllers;
2121
using Synapse.Core.Api.Services;
2222
using System.Text.Json;
23+
using System.Text.Json.Serialization;
2324

2425
namespace Synapse.Api.Http;
2526

@@ -44,6 +45,7 @@ public static IServiceCollection AddSynapseHttpApi(this IServiceCollection servi
4445
.AddJsonOptions(options =>
4546
{
4647
options.JsonSerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
48+
options.JsonSerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull | JsonIgnoreCondition.WhenWritingDefault;
4749
})
4850
.AddApplicationPart(typeof(WorkflowsController).Assembly);
4951
services.AddIdentityServer(options =>

src/core/Synapse.Core/Resources/CorrelationSpec.cs

+8-2
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,22 @@ public record CorrelationSpec
4646
[DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)]
4747
public virtual EventConsumptionStrategyDefinition Events { get; set; } = null!;
4848

49+
/// <summary>
50+
/// Gets/sets a key/value mapping, if any, of the keys to use to correlate events
51+
/// </summary>
52+
[DataMember(Name = "keys", Order = 5), JsonPropertyName("keys"), JsonPropertyOrder(5), YamlMember(Alias = "keys", Order = 5)]
53+
public virtual EquatableDictionary<string, string>? Keys { get; set; }
54+
4955
/// <summary>
5056
/// Gets/sets a boolean indicating whether or not to stream events. When enabled, each correlated event is atomically published to the subscriber immediately rather than waiting for the entire correlation to complete
5157
/// </summary>
52-
[DataMember(Name = "stream", Order = 5), JsonPropertyName("stream"), JsonPropertyOrder(5), YamlMember(Alias = "stream", Order = 5)]
58+
[DataMember(Name = "stream", Order = 6), JsonPropertyName("stream"), JsonPropertyOrder(6), YamlMember(Alias = "stream", Order = 6)]
5359
public virtual bool Stream { get; set; }
5460

5561
/// <summary>
5662
/// Gets/sets an object used to configure the correlation's outcome
5763
/// </summary>
58-
[DataMember(Name = "outcome", Order = 6), JsonPropertyName("outcome"), JsonPropertyOrder(6), YamlMember(Alias = "outcome", Order = 6)]
64+
[DataMember(Name = "outcome", Order = 7), JsonPropertyName("outcome"), JsonPropertyOrder(7), YamlMember(Alias = "outcome", Order = 7)]
5965
public virtual CorrelationOutcomeDefinition Outcome { get; set; } = null!;
6066

6167
}

src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs

+11-3
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
123123
{
124124
Id = Guid.NewGuid().ToString("N")[..12],
125125
Events = [new(filter.Key, e)],
126-
Keys = CorrelationKeys == null ? new() : new(CorrelationKeys)
126+
Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys)
127127
};
128128
this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id);
129129
this.Logger.LogInformation("Event successfully correlated to context with id '{contextId}'", context.Id);
@@ -152,7 +152,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
152152
{
153153
Id = Guid.NewGuid().ToString("N")[..12],
154154
Events = [new(filter.Key, e)],
155-
Keys = CorrelationKeys == null ? new() : new(CorrelationKeys)
155+
Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys)
156156
};
157157
await this.CreateOrUpdateContextAsync(context, cancellationToken).ConfigureAwait(false);
158158
this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id);
@@ -289,7 +289,7 @@ protected virtual async Task<bool> TryFilterEventAsync(EventFilterDefinition fil
289289
protected virtual async Task<(bool Succeeded, IDictionary<string, string>? CorrelationKeys)> TryExtractCorrelationKeysAsync(CloudEvent e, IDictionary<string, CorrelationKeyDefinition>? keyDefinitions, CancellationToken cancellationToken)
290290
{
291291
ArgumentNullException.ThrowIfNull(e);
292-
var correlationKeys = new Dictionary<string, string>();
292+
var correlationKeys = this.Correlation.Resource.Spec.Keys ?? [];
293293
if (keyDefinitions == null || keyDefinitions.Count < 1) return (true, correlationKeys);
294294
foreach (var keyDefinition in keyDefinitions)
295295
{
@@ -305,6 +305,7 @@ protected virtual async Task<bool> TryFilterEventAsync(EventFilterDefinition fil
305305
}
306306
else if (!keyDefinition.Value.Expect.Equals(correlationTerm, StringComparison.OrdinalIgnoreCase)) return (false, null);
307307
}
308+
if (correlationKeys.ContainsKey(keyDefinition.Key) && correlationTerm != correlationKeys[keyDefinition.Key]) return (false, null);
308309
correlationKeys[keyDefinition.Key] = correlationTerm;
309310
}
310311
return (true, correlationKeys);
@@ -373,6 +374,13 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
373374
{
374375
Definition = this.Correlation.Resource.Spec.Outcome.Start!.Workflow,
375376
Input = input
377+
},
378+
Status = new()
379+
{
380+
Correlation = new()
381+
{
382+
Keys = context.Keys
383+
}
376384
}
377385
};
378386
await this.Resources.AddAsync(workflowInstance, false, cancellationToken).ConfigureAwait(false);

src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs

+12
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ public virtual async Task<CorrelationContext> CorrelateAsync(ITaskExecutionConte
449449
Source = new ResourceReference<WorkflowInstance>(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()),
450450
Lifetime = CorrelationLifetime.Ephemeral,
451451
Events = listenTask.Listen.To,
452+
Keys = this.Instance.Status?.Correlation?.Keys,
452453
Expressions = task.Workflow.Definition.Evaluate ?? new(),
453454
Outcome = new()
454455
{
@@ -511,6 +512,17 @@ public virtual async Task<CorrelationContext> CorrelateAsync(ITaskExecutionConte
511512
CompletedAt = DateTimeOffset.Now
512513
}
513514
}, cancellationToken).ConfigureAwait(false);
515+
using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false);
516+
this.Instance = await this.Api.WorkflowInstances.GetAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken).ConfigureAwait(false);
517+
var originalInstance = this.Instance.Clone();
518+
foreach(var correlationKey in correlationContext.Keys)
519+
{
520+
this.Instance.Status!.Correlation!.Keys ??= [];
521+
this.Instance.Status!.Correlation!.Keys[correlationKey.Key] = correlationKey.Value;
522+
}
523+
this.Instance.Status!.Correlation!.Contexts!.Remove(task.Instance.Reference.OriginalString);
524+
var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(originalInstance, this.Instance);
525+
this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false);
514526
return correlationContext;
515527
}
516528

src/runner/Synapse.Runner/Services/Executors/AsyncApiCallExecutor.cs

+38-21
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
using Neuroglia.AsyncApi.IO;
1919
using Neuroglia.AsyncApi.v3;
2020
using Neuroglia.Data.Expressions;
21+
using System.Threading;
2122

2223
namespace Synapse.Runner.Services.Executors;
2324

@@ -94,6 +95,11 @@ public class AsyncApiCallExecutor(IServiceProvider serviceProvider, ILogger<Asyn
9495
/// </summary>
9596
protected uint? Offset { get; set; }
9697

98+
/// <summary>
99+
/// Gets/sets a boolean indicating whether or not to keep consuming incoming messages
100+
/// </summary>
101+
protected bool KeepConsume { get; set; } = true;
102+
97103
/// <summary>
98104
/// Gets the path for the specified message
99105
/// </summary>
@@ -234,7 +240,7 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken
234240
if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation");
235241
await using var asyncApiClient = this.AsyncApiClientFactory.CreateFor(this.Document);
236242
var parameters = new AsyncApiSubscribeOperationParameters(this.Operation.Key, this.AsyncApi.Server, this.AsyncApi.Protocol);
237-
await using var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false);
243+
var result = await asyncApiClient.SubscribeAsync(parameters, cancellationToken).ConfigureAwait(false);
238244
if (!result.IsSuccessful) throw new Exception("Failed to execute the AsyncAPI subscribe operation");
239245
if(result.Messages == null)
240246
{
@@ -244,24 +250,24 @@ protected virtual async Task DoExecuteSubscribeOperationAsync(CancellationToken
244250
var observable = result.Messages;
245251
if (this.AsyncApi.Subscription.Consume.For != null) observable = observable.TakeUntil(Observable.Timer(this.AsyncApi.Subscription.Consume.For.ToTimeSpan()));
246252
if (this.AsyncApi.Subscription.Consume.Amount.HasValue) observable = observable.Take(this.AsyncApi.Subscription.Consume.Amount.Value);
247-
else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) observable = observable.Select(message => Observable.FromAsync(async () =>
248-
{
249-
var keepGoing = await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!,this.GetExpressionEvaluationArguments(),cancellationToken).ConfigureAwait(false);
250-
return (message, keepGoing);
251-
})).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
252-
else if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) observable = observable.Select(message => Observable.FromAsync(async () =>
253-
{
254-
var keepGoing = !(await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false));
255-
return (message, keepGoing);
256-
})).Concat().TakeWhile(i => i.keepGoing).Select(i => i.message);
257253
if (this.AsyncApi.Subscription.Foreach == null)
258254
{
259-
var messages = await observable.ToAsyncEnumerable().ToListAsync(cancellationToken).ConfigureAwait(false);
255+
var messages = await observable.ToAsyncEnumerable().TakeWhileAwait(async m =>
256+
{
257+
if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While)) return await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
258+
if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until)) return !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
259+
return true;
260+
}).ToListAsync(cancellationToken).ConfigureAwait(false);
260261
await this.SetResultAsync(messages, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
261262
}
262263
else
263264
{
264-
this.Subscription = observable.SubscribeAsync(OnStreamingMessageAsync, OnStreamingErrorAsync, OnStreamingCompletedAsync);
265+
//todo: fix
266+
this.Subscription = observable.TakeWhile(_ => this.KeepConsume).SelectMany(m =>
267+
{
268+
OnStreamingMessageAsync(m).GetAwaiter().GetResult();
269+
return Observable.Return(m);
270+
}).SubscribeAsync(_ => System.Threading.Tasks.Task.CompletedTask, OnStreamingErrorAsync, OnStreamingCompletedAsync);
265271
}
266272
}
267273

@@ -274,6 +280,11 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message)
274280
{
275281
if (this.AsyncApi == null || this.Document == null || this.Operation.Value == null) throw new InvalidOperationException("The executor must be initialized before execution");
276282
if (this.AsyncApi.Subscription == null) throw new NullReferenceException("The 'subscription' must be set when performing an AsyncAPI v3 subscribe operation");
283+
if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.While) && !await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.While, this.Task.Input!, this.GetExpressionEvaluationArguments(), this.CancellationTokenSource!.Token).ConfigureAwait(false))
284+
{
285+
this.KeepConsume = false;
286+
return;
287+
}
277288
if (this.AsyncApi.Subscription.Foreach?.Do != null)
278289
{
279290
var taskDefinition = new DoTaskDefinition()
@@ -284,30 +295,36 @@ protected virtual async Task OnStreamingMessageAsync(IAsyncApiMessage message)
284295
new(SynapseDefaults.Tasks.Metadata.PathPrefix.Name, false)
285296
]
286297
};
287-
var arguments = this.GetExpressionEvaluationArguments();
288298
var messageData = message as object;
299+
var offset = this.Offset ?? 0;
300+
if (!this.Offset.HasValue) this.Offset = 0;
301+
var arguments = this.GetExpressionEvaluationArguments();
302+
arguments ??= new Dictionary<string, object>();
303+
arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!;
304+
arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset;
289305
if (this.AsyncApi.Subscription.Foreach.Output?.As is string fromExpression) messageData = await this.Task.Workflow.Expressions.EvaluateAsync<object>(fromExpression, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false);
290-
else if (this.AsyncApi.Subscription.Foreach.Output?.As != null) messageData = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.AsyncApi.Subscription.Foreach.Output.As, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false);
306+
else if (this.AsyncApi.Subscription.Foreach.Output?.As != null) messageData = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.AsyncApi.Subscription.Foreach.Output.As!, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false);
291307
if (this.AsyncApi.Subscription.Foreach.Export?.As is string toExpression)
292308
{
293309
var context = (await this.Task.Workflow.Expressions.EvaluateAsync<IDictionary<string, object>>(toExpression, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!;
294310
await this.Task.SetContextDataAsync(context, this.CancellationTokenSource!.Token).ConfigureAwait(false);
295311
}
296312
else if (this.AsyncApi.Subscription.Foreach.Export?.As != null)
297313
{
298-
var context = (await this.Task.Workflow.Expressions.EvaluateAsync<IDictionary<string, object>>(this.AsyncApi.Subscription.Foreach.Export.As, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!;
314+
var context = (await this.Task.Workflow.Expressions.EvaluateAsync<IDictionary<string, object>>(this.AsyncApi.Subscription.Foreach.Export.As!, messageData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false))!;
299315
await this.Task.SetContextDataAsync(context, this.CancellationTokenSource!.Token).ConfigureAwait(false);
300316
}
301-
var offset = this.Offset ?? 0;
302-
if (!this.Offset.HasValue) this.Offset = 0;
303-
arguments ??= new Dictionary<string, object>();
304-
arguments[this.AsyncApi.Subscription.Foreach.Item ?? RuntimeExpressions.Arguments.Each] = messageData!;
305-
arguments[this.AsyncApi.Subscription.Foreach.At ?? RuntimeExpressions.Arguments.Index] = offset;
306317
var task = await this.Task.Workflow.CreateTaskAsync(taskDefinition, this.GetPathFor(offset), this.Task.Input, null, this.Task, false, this.CancellationTokenSource!.Token).ConfigureAwait(false);
307318
var taskExecutor = await this.CreateTaskExecutorAsync(task, taskDefinition, this.Task.ContextData, arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false);
308319
await taskExecutor.ExecuteAsync(this.CancellationTokenSource!.Token).ConfigureAwait(false);
320+
if (this.Task.ContextData != taskExecutor.Task.ContextData) await this.Task.SetContextDataAsync(taskExecutor.Task.ContextData, this.CancellationTokenSource!.Token).ConfigureAwait(false);
309321
this.Offset++;
310322
}
323+
if (!string.IsNullOrWhiteSpace(this.AsyncApi.Subscription.Consume.Until) && await this.Task.Workflow.Expressions.EvaluateConditionAsync(this.AsyncApi.Subscription.Consume.Until, this.Task.Input!, this.GetExpressionEvaluationArguments(), this.CancellationTokenSource!.Token).ConfigureAwait(false))
324+
{
325+
this.KeepConsume = false;
326+
return;
327+
}
311328
}
312329

313330
/// <summary>

0 commit comments

Comments
 (0)