Skip to content

Commit 370a883

Browse files
authored
Merge pull request #492 from serverlessworkflow/fix-correlation-handling
Fix correlation handling
2 parents fa81628 + e149870 commit 370a883

File tree

7 files changed

+73
-31
lines changed

7 files changed

+73
-31
lines changed

Synapse.sln

-3
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Runtime.Kubernetes"
134134
EndProject
135135
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Synapse.Core.Infrastructure.Containers.Docker", "src\core\Synapse.Core.Infrastructure.Containers.Docker\Synapse.Core.Infrastructure.Containers.Docker.csproj", "{DD6381BD-2C8B-4CE1-99B2-EC585DD818FA}"
136136
EndProject
137-
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kubernetes", "kubernetes", "{B3F3DB1B-23E7-45FA-8934-448BFFB294E8}"
138-
EndProject
139137
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Synapse.Core.Infrastructure.Containers.Kubernetes", "src\core\Synapse.Core.Infrastructure.Containers.Kubernetes\Synapse.Core.Infrastructure.Containers.Kubernetes.csproj", "{41C99069-BD99-4FD2-BF33-984CF03B53E8}"
140138
EndProject
141139
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{35D495F4-D267-4A84-9479-DB3C1BE85434}"
@@ -292,7 +290,6 @@ Global
292290
{8FF58403-9E13-4F58-864F-E6FBC877BF37} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527}
293291
{9B37AA4A-A342-4A41-A2A1-C8466825A70A} = {175CE1C5-FE17-4C8B-8823-E812BAD4E527}
294292
{DD6381BD-2C8B-4CE1-99B2-EC585DD818FA} = {9E296C8A-4D78-4592-B046-11A3A953FD25}
295-
{B3F3DB1B-23E7-45FA-8934-448BFFB294E8} = {562C91A3-6E91-4489-9D9D-064E7436D900}
296293
{41C99069-BD99-4FD2-BF33-984CF03B53E8} = {9E296C8A-4D78-4592-B046-11A3A953FD25}
297294
{AB30A91B-0158-411D-9BD3-36FFA441B3A2} = {35D495F4-D267-4A84-9479-DB3C1BE85434}
298295
{06404855-A5BE-4556-91BC-064630E95737} = {35D495F4-D267-4A84-9479-DB3C1BE85434}

src/api/Synapse.Api.Http/Extensions/IServiceCollectionExtensions.cs

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
using Synapse.Api.Http.Controllers;
2121
using Synapse.Core.Api.Services;
2222
using System.Text.Json;
23+
using System.Text.Json.Serialization;
2324

2425
namespace Synapse.Api.Http;
2526

@@ -44,6 +45,7 @@ public static IServiceCollection AddSynapseHttpApi(this IServiceCollection servi
4445
.AddJsonOptions(options =>
4546
{
4647
options.JsonSerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
48+
options.JsonSerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull | JsonIgnoreCondition.WhenWritingDefault;
4749
})
4850
.AddApplicationPart(typeof(WorkflowsController).Assembly);
4951
services.AddIdentityServer(options =>

src/api/Synapse.Api.Server/appsettings.Development.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@
1919
}
2020
},
2121
"CloudEvents": {
22-
"Endpoint": "https://webhook.site/a4aff725-0711-48b2-a9d2-5d1b806d04d0"
22+
"Endpoint": "http://localhost:5151/api/events/pub"
2323
}
2424
}

src/core/Synapse.Core/Resources/CorrelationSpec.cs

+8-2
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,22 @@ public record CorrelationSpec
4646
[DataMember(Name = "events", Order = 4), JsonPropertyName("events"), JsonPropertyOrder(4), YamlMember(Alias = "events", Order = 4)]
4747
public virtual EventConsumptionStrategyDefinition Events { get; set; } = null!;
4848

49+
/// <summary>
50+
/// Gets/sets a key/value mapping, if any, of the keys to use to correlate events
51+
/// </summary>
52+
[DataMember(Name = "keys", Order = 5), JsonPropertyName("keys"), JsonPropertyOrder(5), YamlMember(Alias = "keys", Order = 5)]
53+
public virtual EquatableDictionary<string, string>? Keys { get; set; }
54+
4955
/// <summary>
5056
/// Gets/sets a boolean indicating whether or not to stream events. When enabled, each correlated event is atomically published to the subscriber immediately rather than waiting for the entire correlation to complete
5157
/// </summary>
52-
[DataMember(Name = "stream", Order = 5), JsonPropertyName("stream"), JsonPropertyOrder(5), YamlMember(Alias = "stream", Order = 5)]
58+
[DataMember(Name = "stream", Order = 6), JsonPropertyName("stream"), JsonPropertyOrder(6), YamlMember(Alias = "stream", Order = 6)]
5359
public virtual bool Stream { get; set; }
5460

5561
/// <summary>
5662
/// Gets/sets an object used to configure the correlation's outcome
5763
/// </summary>
58-
[DataMember(Name = "outcome", Order = 6), JsonPropertyName("outcome"), JsonPropertyOrder(6), YamlMember(Alias = "outcome", Order = 6)]
64+
[DataMember(Name = "outcome", Order = 7), JsonPropertyName("outcome"), JsonPropertyOrder(7), YamlMember(Alias = "outcome", Order = 7)]
5965
public virtual CorrelationOutcomeDefinition Outcome { get; set; } = null!;
6066

6167
}

src/correlator/Synapse.Correlator/Services/CorrelationHandler.cs

+12-4
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
123123
{
124124
Id = Guid.NewGuid().ToString("N")[..12],
125125
Events = [new(filter.Key, e)],
126-
Keys = CorrelationKeys == null ? new() : new(CorrelationKeys)
126+
Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys)
127127
};
128128
this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id);
129129
this.Logger.LogInformation("Event successfully correlated to context with id '{contextId}'", context.Id);
@@ -152,7 +152,7 @@ protected virtual async Task CorrelateEventAsync(CloudEvent e, CancellationToken
152152
{
153153
Id = Guid.NewGuid().ToString("N")[..12],
154154
Events = [new(filter.Key, e)],
155-
Keys = CorrelationKeys == null ? new() : new(CorrelationKeys)
155+
Keys = CorrelationKeys == null ? this.Correlation.Resource.Spec.Keys ?? [] : new(CorrelationKeys)
156156
};
157157
await this.CreateOrUpdateContextAsync(context, cancellationToken).ConfigureAwait(false);
158158
this.Logger.LogInformation("Correlation context with id '{contextId}' successfully created", context.Id);
@@ -289,7 +289,7 @@ protected virtual async Task<bool> TryFilterEventAsync(EventFilterDefinition fil
289289
protected virtual async Task<(bool Succeeded, IDictionary<string, string>? CorrelationKeys)> TryExtractCorrelationKeysAsync(CloudEvent e, IDictionary<string, CorrelationKeyDefinition>? keyDefinitions, CancellationToken cancellationToken)
290290
{
291291
ArgumentNullException.ThrowIfNull(e);
292-
var correlationKeys = new Dictionary<string, string>();
292+
var correlationKeys = this.Correlation.Resource.Spec.Keys ?? [];
293293
if (keyDefinitions == null || keyDefinitions.Count < 1) return (true, correlationKeys);
294294
foreach (var keyDefinition in keyDefinitions)
295295
{
@@ -305,6 +305,7 @@ protected virtual async Task<bool> TryFilterEventAsync(EventFilterDefinition fil
305305
}
306306
else if (!keyDefinition.Value.Expect.Equals(correlationTerm, StringComparison.OrdinalIgnoreCase)) return (false, null);
307307
}
308+
if (correlationKeys.ContainsKey(keyDefinition.Key) && correlationTerm != correlationKeys[keyDefinition.Key]) return (false, null);
308309
correlationKeys[keyDefinition.Key] = correlationTerm;
309310
}
310311
return (true, correlationKeys);
@@ -361,7 +362,7 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
361362
await this.Resources.PatchStatusAsync<WorkflowInstance>(new(PatchType.JsonPatch, patch), workflowInstance.GetName(), workflowInstance.GetNamespace(), null, false, cancellationToken).ConfigureAwait(false);
362363
break;
363364
case CorrelationOutcomeType.Start:
364-
var input = this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? [] : await this.ExpressionEvaluator.EvaluateAsync<EquatableDictionary<string, object>>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false);
365+
var input = (this.Correlation.Resource.Spec.Outcome.Start!.Input == null ? new() { { "events", context.Events.Values } } : await this.ExpressionEvaluator.EvaluateAsync<EquatableDictionary<string, object>>(this.Correlation.Resource.Spec.Outcome.Start!.Input!, context, cancellationToken: cancellationToken).ConfigureAwait(false));
365366
workflowInstance = new()
366367
{
367368
Metadata = new()
@@ -373,6 +374,13 @@ protected virtual async Task CreateOrUpdateContextAsync(CorrelationContext conte
373374
{
374375
Definition = this.Correlation.Resource.Spec.Outcome.Start!.Workflow,
375376
Input = input
377+
},
378+
Status = new()
379+
{
380+
Correlation = new()
381+
{
382+
Keys = context.Keys
383+
}
376384
}
377385
};
378386
await this.Resources.AddAsync(workflowInstance, false, cancellationToken).ConfigureAwait(false);

src/runner/Synapse.Runner/Services/ConnectedWorkflowExecutionContext.cs

+12
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,7 @@ public virtual async Task<CorrelationContext> CorrelateAsync(ITaskExecutionConte
449449
Source = new ResourceReference<WorkflowInstance>(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()),
450450
Lifetime = CorrelationLifetime.Ephemeral,
451451
Events = listenTask.Listen.To,
452+
Keys = this.Instance.Status?.Correlation?.Keys,
452453
Expressions = task.Workflow.Definition.Evaluate ?? new(),
453454
Outcome = new()
454455
{
@@ -511,6 +512,17 @@ public virtual async Task<CorrelationContext> CorrelateAsync(ITaskExecutionConte
511512
CompletedAt = DateTimeOffset.Now
512513
}
513514
}, cancellationToken).ConfigureAwait(false);
515+
using var @lock = await this.Lock.LockAsync(cancellationToken).ConfigureAwait(false);
516+
this.Instance = await this.Api.WorkflowInstances.GetAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken).ConfigureAwait(false);
517+
var originalInstance = this.Instance.Clone();
518+
foreach(var correlationKey in correlationContext.Keys)
519+
{
520+
this.Instance.Status!.Correlation!.Keys ??= [];
521+
this.Instance.Status!.Correlation!.Keys[correlationKey.Key] = correlationKey.Value;
522+
}
523+
this.Instance.Status!.Correlation!.Contexts!.Remove(task.Instance.Reference.OriginalString);
524+
var jsonPatch = JsonPatchUtility.CreateJsonPatchFromDiff(originalInstance, this.Instance);
525+
this.Instance = await this.Api.WorkflowInstances.PatchStatusAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, new Patch(PatchType.JsonPatch, jsonPatch), null, cancellationToken).ConfigureAwait(false);
514526
return correlationContext;
515527
}
516528

0 commit comments

Comments
 (0)