Skip to content

Commit eab800f

Browse files
committed
Enhance StreamResponse handling and update dependencies
Updated `ResponseBuilderDefaults` to include `StreamResponse` in `SpecialTypes`. Refactored `SetBodyCoreAsync` in `DefaultResponseBuilder.cs` for readability and removed unnecessary `using` statements. Modified `RequestCoreAsync` in `HttpWebRequestInvoker.cs` and `BuildResponseAsync` in `InMemoryRequestInvoker.cs` to handle `StreamResponse` types with proper disposal. Updated `Elastic.Transport.csproj` to reference `System.Text.Json` version `8.0.5`.
1 parent b65498c commit eab800f

File tree

4 files changed

+77
-70
lines changed

4 files changed

+77
-70
lines changed

src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal static class ResponseBuilderDefaults
2626

2727
public static readonly Type[] SpecialTypes =
2828
{
29-
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse)
29+
typeof(StringResponse), typeof(BytesResponse), typeof(VoidResponse), typeof(DynamicResponse), typeof(StreamResponse)
3030
};
3131
}
3232

@@ -224,68 +224,65 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
224224
details.ResponseBodyInBytes = bytes;
225225
}
226226

227-
using (responseStream)
228-
{
229-
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
227+
if (SetSpecialTypes<TResponse>(mimeType, bytes, responseStream, requestData.MemoryStreamFactory, out var r)) return r;
230228

231-
if (details.HttpStatusCode.HasValue &&
232-
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
233-
return null;
229+
if (details.HttpStatusCode.HasValue &&
230+
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
231+
return null;
234232

235-
var serializer = requestData.ConnectionSettings.RequestResponseSerializer;
233+
var serializer = requestData.ConnectionSettings.RequestResponseSerializer;
236234

237-
TResponse response;
238-
if (requestData.CustomResponseBuilder != null)
239-
{
240-
var beforeTicks = Stopwatch.GetTimestamp();
235+
TResponse response;
236+
if (requestData.CustomResponseBuilder != null)
237+
{
238+
var beforeTicks = Stopwatch.GetTimestamp();
241239

242-
if (isAsync)
243-
response = await requestData.CustomResponseBuilder
244-
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
245-
.ConfigureAwait(false) as TResponse;
246-
else
247-
response = requestData.CustomResponseBuilder
248-
.DeserializeResponse(serializer, details, responseStream) as TResponse;
240+
if (isAsync)
241+
response = await requestData.CustomResponseBuilder
242+
.DeserializeResponseAsync(serializer, details, responseStream, cancellationToken)
243+
.ConfigureAwait(false) as TResponse;
244+
else
245+
response = requestData.CustomResponseBuilder
246+
.DeserializeResponse(serializer, details, responseStream) as TResponse;
249247

250-
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
251-
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
252-
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
248+
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
249+
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
250+
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
253251

254-
return response;
255-
}
252+
return response;
253+
}
256254

257-
// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
258-
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
259-
try
255+
// TODO: Handle empty data in a nicer way as throwing exceptions has a cost we'd like to avoid!
256+
// ie. check content-length (add to ApiCallDetails)? Content-length cannot be retrieved from a GZip content stream which is annoying.
257+
try
258+
{
259+
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
260260
{
261-
if (requiresErrorDeserialization && TryGetError(details, requestData, responseStream, out var error) && error.HasError())
262-
{
263-
response = new TResponse();
264-
SetErrorOnResponse(response, error);
265-
return response;
266-
}
261+
response = new TResponse();
262+
SetErrorOnResponse(response, error);
263+
return response;
264+
}
267265

268-
if (!requestData.ValidateResponseContentType(mimeType))
269-
return default;
266+
if (!requestData.ValidateResponseContentType(mimeType))
267+
return default;
270268

271-
var beforeTicks = Stopwatch.GetTimestamp();
269+
var beforeTicks = Stopwatch.GetTimestamp();
272270

273-
if (isAsync)
274-
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
275-
else
276-
response = serializer.Deserialize<TResponse>(responseStream);
271+
if (isAsync)
272+
response = await serializer.DeserializeAsync<TResponse>(responseStream, cancellationToken).ConfigureAwait(false);
273+
else
274+
response = serializer.Deserialize<TResponse>(responseStream);
277275

278-
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
276+
var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
279277

280-
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
281-
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
278+
if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
279+
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
282280

283-
return response;
284-
}
285-
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
286-
{
287-
return default;
288-
}
281+
return response;
282+
}
283+
catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
284+
{
285+
return default;
289286
}
290287
}
291288

src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -161,28 +161,32 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Req
161161
{
162162
unregisterWaitHandle?.Invoke();
163163
}
164-
responseStream ??= Stream.Null;
165164

166-
TResponse response;
165+
var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);
167166

168-
if (isAsync)
169-
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
170-
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
171-
.ConfigureAwait(false);
172-
else
173-
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
174-
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);
175-
176-
if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
167+
using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
177168
{
178-
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
179-
foreach (var attribute in attributes)
169+
TResponse response;
170+
171+
if (isAsync)
172+
response = await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponseAsync<TResponse>
173+
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats, cancellationToken)
174+
.ConfigureAwait(false);
175+
else
176+
response = requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>
177+
(requestData, ex, statusCode, responseHeaders, responseStream, mimeType, contentLength, threadPoolStats, tcpStats);
178+
179+
if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
180180
{
181-
Activity.Current?.SetTag(attribute.Key, attribute.Value);
181+
var attributes = requestData.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
182+
foreach (var attribute in attributes)
183+
{
184+
Activity.Current?.SetTag(attribute.Key, attribute.Value);
185+
}
182186
}
183-
}
184187

185-
return response;
188+
return response;
189+
}
186190
}
187191

188192
private static Dictionary<string, IEnumerable<string>> ParseHeaders(RequestData requestData, HttpWebResponse responseMessage, Dictionary<string, IEnumerable<string>> responseHeaders)

src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,16 @@ public async Task<TResponse> BuildResponseAsync<TResponse>(RequestData requestDa
109109
requestData.MadeItToResponse = true;
110110

111111
var sc = statusCode ?? _statusCode;
112-
Stream s = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);
113-
return await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
114-
.ToResponseAsync<TResponse>(requestData, _exception, sc, _headers, s, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)
112+
113+
Stream responseStream = body != null ? requestData.MemoryStreamFactory.Create(body) : requestData.MemoryStreamFactory.Create(EmptyBody);
114+
115+
var isStreamResponse = typeof(TResponse) == typeof(StreamResponse);
116+
117+
using (isStreamResponse ? Stream.Null : responseStream ??= Stream.Null)
118+
{
119+
return await requestData.ConnectionSettings.ProductRegistration.ResponseBuilder
120+
.ToResponseAsync<TResponse>(requestData, _exception, sc, _headers, responseStream, contentType ?? _contentType, body?.Length ?? 0, null, null, cancellationToken)
115121
.ConfigureAwait(false);
122+
}
116123
}
117-
118124
}

src/Elastic.Transport/Elastic.Transport.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
</ItemGroup>
3636

3737
<ItemGroup Condition="'$(TargetFramework)' != 'net8.0'">
38-
<PackageReference Include="System.Text.Json" Version="8.0.4" />
38+
<PackageReference Include="System.Text.Json" Version="8.0.5" />
3939
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="8.0.0" />
4040
</ItemGroup>
4141

0 commit comments

Comments
 (0)