Skip to content

Commit 71d9adb

Browse files
authoredOct 24, 2024··
Merge pull request #440 from serverlessworkflow/fix-watch-and-monitor-client
Fixed the API client to use SSEs for watching and monitoring resources
2 parents 03eae9d + 20d6fac commit 71d9adb

File tree

11 files changed

+81
-34
lines changed

11 files changed

+81
-34
lines changed
 

‎src/api/Synapse.Api.Client.Core/Services/IClusterResourceApiClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ public interface IClusterResourceApiClient<TResource>
3838
/// <param name="labelSelectors">Defines the expected labels, if any, of the resources to watch</param>
3939
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
4040
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to asynchronously enumerate resulting <see cref="IResourceWatchEvent"/>s</returns>
41-
Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default);
41+
IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default);
4242

4343
/// <summary>
4444
/// Monitors the resource with the specified name
4545
/// </summary>
4646
/// <param name="name">The name of the resource to monitor</param>
4747
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
4848
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to asynchronously enumerate resulting <see cref="IResourceWatchEvent"/>s</returns>
49-
Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, CancellationToken cancellationToken = default);
49+
IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, CancellationToken cancellationToken = default);
5050

5151
/// <summary>
5252
/// Gets the resource with the specified name

‎src/api/Synapse.Api.Client.Core/Services/INamespacedResourceApiClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public interface INamespacedResourceApiClient<TResource>
4040
/// <param name="labelSelectors">Defines the expected labels, if any, of the resources to watch</param>
4141
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
4242
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to asynchronously enumerate resulting <see cref="IResourceWatchEvent"/>s</returns>
43-
Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default);
43+
IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default);
4444

4545
/// <summary>
4646
/// Monitors the resource with the specified name
@@ -49,7 +49,7 @@ public interface INamespacedResourceApiClient<TResource>
4949
/// <param name="namespace">The namespace the resource to monitor belongs to</param>
5050
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
5151
/// <returns>A new <see cref="IAsyncEnumerable{T}"/> used to asynchronously enumerate resulting <see cref="IResourceWatchEvent"/>s</returns>
52-
Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default);
52+
IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default);
5353

5454
/// <summary>
5555
/// Gets the resource with the specified name

‎src/api/Synapse.Api.Client.Http/Services/ResourceHttpApiClient.cs

+46-15
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
// See the License for the specific language governing permissions and
1212
// limitations under the License.
1313

14+
using System.Runtime.CompilerServices;
15+
1416
namespace Synapse.Api.Client.Services;
1517

1618
/// <summary>
@@ -105,60 +107,89 @@ public virtual async Task<IAsyncEnumerable<TResource>> ListAsync(IEnumerable<Lab
105107
}
106108

107109
/// <inheritdoc/>
108-
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default)
110+
public virtual async IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(string? @namespace = null, IEnumerable<LabelSelector>? labelSelectors = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
109111
{
110112
var resource = new TResource();
111-
var uri = string.IsNullOrWhiteSpace(@namespace) ? $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch" : $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/watch";
113+
var uri = string.IsNullOrWhiteSpace(@namespace) ? $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch/sse" : $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/watch";
112114
var queryStringArguments = new Dictionary<string, string>();
113115
if (labelSelectors?.Any() == true) queryStringArguments.Add("labelSelector", labelSelectors.Select(s => s.ToString()).Join(','));
114116
if (queryStringArguments.Count != 0) uri += $"?{queryStringArguments.Select(kvp => $"{kvp.Key}={kvp.Value}").Join('&')}";
115117
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
116-
request.EnableWebAssemblyStreamingResponse();
117118
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
118119
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
119-
return this.JsonSerializer.DeserializeAsyncEnumerable<ResourceWatchEvent<TResource>>(responseStream, cancellationToken)!;
120+
using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync());
121+
while (!streamReader.EndOfStream)
122+
{
123+
var sseMessage = await streamReader.ReadLineAsync();
124+
if (string.IsNullOrWhiteSpace(sseMessage)) continue;
125+
var json = sseMessage["data: ".Length..].Trim();
126+
var e = JsonSerializer.Deserialize<ResourceWatchEvent<TResource>>(json)!;
127+
yield return e;
128+
}
120129
}
121130

122131
/// <inheritdoc/>
123-
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default)
132+
public virtual async IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
124133
{
125134
var resource = new TResource();
126-
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch";
135+
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/watch/sse";
127136
var queryStringArguments = new Dictionary<string, string>();
128137
if (labelSelectors?.Any() == true) queryStringArguments.Add("labelSelector", labelSelectors.Select(s => s.ToString()).Join(','));
129138
if (queryStringArguments.Count != 0) uri += $"?{queryStringArguments.Select(kvp => $"{kvp.Key}={kvp.Value}").Join('&')}";
130139
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
131140
request.EnableWebAssemblyStreamingResponse();
132141
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
133142
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
134-
return this.JsonSerializer.DeserializeAsyncEnumerable<ResourceWatchEvent<TResource>>(responseStream, cancellationToken)!;
143+
using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync());
144+
while (!streamReader.EndOfStream)
145+
{
146+
var sseMessage = await streamReader.ReadLineAsync();
147+
if (string.IsNullOrWhiteSpace(sseMessage)) continue;
148+
var json = sseMessage["data: ".Length..].Trim();
149+
var e = JsonSerializer.Deserialize<ResourceWatchEvent<TResource>>(json)!;
150+
yield return e;
151+
}
135152
}
136153

137154
/// <inheritdoc/>
138-
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, string @namespace, CancellationToken cancellationToken = default)
155+
public virtual async IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, string @namespace, [EnumeratorCancellation]CancellationToken cancellationToken = default)
139156
{
140157
ArgumentException.ThrowIfNullOrWhiteSpace(name);
141158
ArgumentException.ThrowIfNullOrWhiteSpace(@namespace);
142159
var resource = new TResource();
143-
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/monitor";
160+
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{@namespace}/{name}/monitor/sse";
144161
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
145-
request.EnableWebAssemblyStreamingResponse();
146162
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
147163
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
148-
return this.JsonSerializer.DeserializeAsyncEnumerable<ResourceWatchEvent<TResource>>(responseStream, cancellationToken)!;
164+
using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync());
165+
while (!streamReader.EndOfStream)
166+
{
167+
var sseMessage = await streamReader.ReadLineAsync();
168+
if (string.IsNullOrWhiteSpace(sseMessage)) continue;
169+
var json = sseMessage["data: ".Length..].Trim();
170+
var e = JsonSerializer.Deserialize<ResourceWatchEvent<TResource>>(json)!;
171+
yield return e;
172+
}
149173
}
150174

151175
/// <inheritdoc/>
152-
public virtual async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, CancellationToken cancellationToken = default)
176+
public virtual async IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, [EnumeratorCancellation]CancellationToken cancellationToken = default)
153177
{
154178
ArgumentException.ThrowIfNullOrWhiteSpace(name);
155179
var resource = new TResource();
156-
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{name}/monitor";
180+
var uri = $"/api/{resource.Definition.Version}/{resource.Definition.Plural}/{name}/monitor/sse";
157181
using var request = await this.ProcessRequestAsync(new HttpRequestMessage(HttpMethod.Get, uri), cancellationToken).ConfigureAwait(false);
158-
request.EnableWebAssemblyStreamingResponse();
159182
var response = await this.HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
160183
var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
161-
return this.JsonSerializer.DeserializeAsyncEnumerable<ResourceWatchEvent<TResource>>(responseStream, cancellationToken)!;
184+
using var streamReader = new StreamReader(await response.Content.ReadAsStreamAsync());
185+
while (!streamReader.EndOfStream)
186+
{
187+
var sseMessage = await streamReader.ReadLineAsync();
188+
if (string.IsNullOrWhiteSpace(sseMessage)) continue;
189+
var json = sseMessage["data: ".Length..].Trim();
190+
var e = JsonSerializer.Deserialize<ResourceWatchEvent<TResource>>(json)!;
191+
yield return e;
192+
}
162193
}
163194

164195
/// <inheritdoc/>

‎src/api/Synapse.Api.Http/ClusterResourceController.cs

+2
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string? labelSel
108108
this.Response.Headers.ContentType = "text/event-stream";
109109
this.Response.Headers.CacheControl = "no-cache";
110110
this.Response.Headers.Connection = "keep-alive";
111+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
111112
await foreach (var e in response.Data!)
112113
{
113114
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";
@@ -147,6 +148,7 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, Ca
147148
this.Response.Headers.ContentType = "text/event-stream";
148149
this.Response.Headers.CacheControl = "no-cache";
149150
this.Response.Headers.Connection = "keep-alive";
151+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
150152
await foreach (var e in response.Data!)
151153
{
152154
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";

‎src/api/Synapse.Api.Http/NamespacedResourceController.cs

+2
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string @namespac
170170
this.Response.Headers.ContentType = "text/event-stream";
171171
this.Response.Headers.CacheControl = "no-cache";
172172
this.Response.Headers.Connection = "keep-alive";
173+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
173174
await foreach (var e in response.Data!)
174175
{
175176
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";
@@ -211,6 +212,7 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, st
211212
this.Response.Headers.ContentType = "text/event-stream";
212213
this.Response.Headers.CacheControl = "no-cache";
213214
this.Response.Headers.Connection = "keep-alive";
215+
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
214216
await foreach(var e in response.Data!)
215217
{
216218
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\\n\\n";

‎src/cli/Synapse.Cli/Commands/WorkflowInstances/MonitorWorkflowInstancesCommand.cs

+1-2
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public MonitorWorkflowInstancesCommand(IServiceProvider serviceProvider, ILogger
6464
public async Task HandleAsync(string name, string @namespace, string output)
6565
{
6666
this.EnsureConfigured();
67-
var enumerable = await this.Api.WorkflowInstances.MonitorAsync(name, @namespace);
68-
await foreach (var e in enumerable)
67+
await foreach (var e in this.Api.WorkflowInstances.MonitorAsync(name, @namespace))
6968
{
7069
string outputText = output.ToLowerInvariant() switch
7170
{

‎src/dashboard/Synapse.Dashboard/Components/ResourceEditor/ResourceEditor.razor

+1-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@
258258
await this.JSRuntime.InvokeVoidAsync("navigator.clipboard.writeText", text);
259259
this.ToastService.Notify(new(ToastType.Success, "Copied to the clipboard!"));
260260
}
261-
catch (Exception ex)
261+
catch
262262
{
263263
this.ToastService.Notify(new(ToastType.Danger, "Failed to copy the definition to the clipboard."));
264264
}

‎src/runner/Synapse.Runner/Services/Executors/WorkflowProcessExecutor.cs

+4-5
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,16 @@ await this.SetErrorAsync(new()
9393
{
9494
Definition = new()
9595
{
96-
Namespace = this.ProcessDefinition.Namespace,
97-
Name = this.ProcessDefinition.Name,
98-
Version = this.ProcessDefinition.Version
96+
Namespace = workflowDefinition.Document.Namespace,
97+
Name = workflowDefinition.Document.Name,
98+
Version = workflowDefinition.Document.Version
9999
},
100100
Input = input
101101
}
102102
};
103103
workflowInstance = await this.Api.WorkflowInstances.CreateAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
104104
}
105-
var watchEvents = await this.Api.WorkflowInstances.MonitorAsync(workflowInstance.GetName(), workflowInstance.GetNamespace()!, cancellationToken).ConfigureAwait(false);
106-
await foreach(var watchEvent in watchEvents)
105+
await foreach(var watchEvent in this.Api.WorkflowInstances.MonitorAsync(workflowInstance.GetName(), workflowInstance.GetNamespace()!, cancellationToken))
107106
{
108107
switch (watchEvent.Resource.Status?.Phase)
109108
{

‎src/runner/Synapse.Runner/Services/WorkflowExecutionContext.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ public virtual async Task<CorrelationContext> CorrelateAsync(ITaskExecutionConte
335335
}
336336
var taskCompletionSource = new TaskCompletionSource<CorrelationContext>();
337337
using var cancellationTokenRegistration = cancellationToken.Register(() => taskCompletionSource.TrySetCanceled());
338-
using var subscription = (await this.Api.WorkflowInstances.MonitorAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken))
338+
using var subscription = this.Api.WorkflowInstances.MonitorAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken)
339339
.ToObservable()
340340
.Where(e => e.Type == ResourceWatchEventType.Updated)
341341
.Select(e => e.Resource.Status?.Correlation?.Contexts)

‎tests/Synapse.UnitTests/Services/MockClusterResourceApiClient.cs

+11-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
using Neuroglia.Data.Infrastructure.ResourceOriented;
1616
using Neuroglia.Data.Infrastructure.ResourceOriented.Services;
1717
using Synapse.Api.Client.Services;
18+
using System.Runtime.CompilerServices;
1819

1920
namespace Synapse.UnitTests.Services;
2021

@@ -26,10 +27,16 @@ internal class MockClusterResourceApiClient<TResource>(IResourceRepository resou
2627
public Task<TResource> CreateAsync(TResource resource, CancellationToken cancellationToken = default) => resources.AddAsync(resource, false, cancellationToken);
2728

2829
public Task<IAsyncEnumerable<TResource>> ListAsync(IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default) => Task.FromResult(resources.GetAllAsync<TResource>(null, labelSelectors, cancellationToken)!);
29-
30-
public async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, CancellationToken cancellationToken = default) => (await resources.WatchAsync<TResource>(null!, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable();
31-
32-
public async Task<IAsyncEnumerable<IResourceWatchEvent<TResource>>> MonitorAsync(string name, CancellationToken cancellationToken = default) => (await resources.MonitorAsync<TResource>(name, null!, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable();
30+
31+
public async IAsyncEnumerable<IResourceWatchEvent<TResource>> WatchAsync(IEnumerable<LabelSelector>? labelSelectors = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
32+
{
33+
await foreach (var e in (await resources.WatchAsync<TResource>(null!, labelSelectors, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e;
34+
}
35+
36+
public async IAsyncEnumerable<IResourceWatchEvent<TResource>> MonitorAsync(string name, [EnumeratorCancellation] CancellationToken cancellationToken = default)
37+
{
38+
await foreach(var e in (await resources.MonitorAsync<TResource>(name, null!, false, cancellationToken).ConfigureAwait(false)).ToAsyncEnumerable()) yield return e;
39+
}
3340

3441
public Task<TResource> GetAsync(string name, CancellationToken cancellationToken = default) => resources.GetAsync<TResource>(name, null, cancellationToken)!;
3542

0 commit comments

Comments
 (0)
Please sign in to comment.