Skip to content

Commit 27cd6b1

Browse files
committed
Update tests and fix post data missed disposal
1 parent 29bae46 commit 27cd6b1

File tree

4 files changed

+103
-87
lines changed

4 files changed

+103
-87
lines changed

src/Elastic.Transport/Components/Pipeline/DefaultResponseBuilder.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,7 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
230230
if (details.HttpStatusCode.HasValue &&
231231
requestData.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
232232
{
233-
// In this scenario, we always dispose as we've explicitly skipped reading the response
234-
if (ownsStream)
235-
responseStream.Dispose();
236-
233+
ConditionalDisposal(responseStream, ownsStream, response);
237234
return null;
238235
}
239236

@@ -296,7 +293,6 @@ private async ValueTask<TResponse> SetBodyCoreAsync<TResponse>(bool isAsync,
296293
{
297294
// Note the exception this handles is ONLY thrown after a check if the stream length is zero.
298295
// When the length is zero, `default` is returned by Deserialize(Async) instead.
299-
300296
ConditionalDisposal(responseStream, ownsStream, response);
301297
return default;
302298
}

src/Elastic.Transport/Requests/Body/PostData.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ protected void FinishStream(Stream writableStream, MemoryStream buffer, ITranspo
111111
buffer.Position = 0;
112112
buffer.CopyTo(writableStream, BufferSize);
113113
WrittenBytes ??= buffer.ToArray();
114+
buffer.Dispose();
114115
}
115116

116117
/// <summary>
@@ -132,5 +133,10 @@ protected async
132133
buffer.Position = 0;
133134
await buffer.CopyToAsync(writableStream, BufferSize, ctx).ConfigureAwait(false);
134135
WrittenBytes ??= buffer.ToArray();
136+
#if NET
137+
await buffer.DisposeAsync().ConfigureAwait(false);
138+
#else
139+
buffer.Dispose();
140+
#endif
135141
}
136142
}

tests/Elastic.Transport.IntegrationTests/Http/StreamResponseTests.cs

Lines changed: 78 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
using System.Collections.Generic;
66
using System.IO;
77
using System.Linq;
8-
using System.Text.Json;
8+
using System.Text;
99
using System.Threading.Tasks;
1010
using Elastic.Transport.IntegrationTests.Plumbing;
1111
using Elastic.Transport.Products.Elasticsearch;
@@ -33,23 +33,6 @@ public async Task StreamResponse_ShouldNotBeDisposed()
3333
_ = sr.ReadToEndAsync();
3434
}
3535

36-
//[Fact]
37-
//public async Task StreamResponse_MemoryStreamShouldNotBeDisposed()
38-
//{
39-
// var nodePool = new SingleNodePool(Server.Uri);
40-
// var memoryStreamFactory = new TrackMemoryStreamFactory();
41-
// var config = new TransportConfiguration(nodePool, productRegistration: new ElasticsearchProductRegistration(typeof(Clients.Elasticsearch.ElasticsearchClient)))
42-
// .MemoryStreamFactory(memoryStreamFactory);
43-
44-
// var transport = new DistributedTransport(config);
45-
46-
// _ = await transport.PostAsync<StreamResponse>(Path, PostData.String("{}"));
47-
48-
// var memoryStream = memoryStreamFactory.Created.Last();
49-
50-
// memoryStream.IsDisposed.Should().BeFalse();
51-
//}
52-
5336
[Fact]
5437
public async Task StreamResponse_MemoryStreamShouldNotBeDisposed()
5538
{
@@ -63,9 +46,9 @@ public async Task StreamResponse_MemoryStreamShouldNotBeDisposed()
6346

6447
_ = await transport.PostAsync<StreamResponse>(Path, PostData.String("{}"));
6548

66-
var memoryStream = memoryStreamFactory.Created.Last();
67-
68-
memoryStream.IsDisposed.Should().BeFalse();
49+
// When disable direct streaming, we have 1 for the original content, 1 for the buffered request bytes and the last for the buffered response
50+
memoryStreamFactory.Created.Count.Should().Be(3);
51+
memoryStreamFactory.Created.Last().IsDisposed.Should().BeFalse();
6952
}
7053

7154
[Fact]
@@ -80,13 +63,36 @@ public async Task StringResponse_MemoryStreamShouldBeDisposed()
8063

8164
_ = await transport.PostAsync<StringResponse>(Path, PostData.String("{}"));
8265

83-
var memoryStream = memoryStreamFactory.Created.Last();
66+
memoryStreamFactory.Created.Count.Should().Be(2);
67+
foreach (var memoryStream in memoryStreamFactory.Created)
68+
{
69+
memoryStream.IsDisposed.Should().BeTrue();
70+
}
71+
}
72+
73+
[Fact]
74+
public async Task WhenInvalidJson_MemoryStreamShouldBeDisposed()
75+
{
76+
var nodePool = new SingleNodePool(Server.Uri);
77+
var memoryStreamFactory = new TrackMemoryStreamFactory();
78+
var config = new TransportConfiguration(nodePool, productRegistration: new ElasticsearchProductRegistration(typeof(Clients.Elasticsearch.ElasticsearchClient)))
79+
.MemoryStreamFactory(memoryStreamFactory)
80+
.DisableDirectStreaming(true);
81+
82+
var transport = new DistributedTransport(config);
83+
84+
var payload = new Payload { ResponseJsonString = " " };
85+
_ = await transport.PostAsync<TestResponse>(Path, PostData.Serializable(payload));
8486

85-
memoryStream.IsDisposed.Should().BeTrue();
87+
memoryStreamFactory.Created.Count.Should().Be(3);
88+
foreach (var memoryStream in memoryStreamFactory.Created)
89+
{
90+
memoryStream.IsDisposed.Should().BeTrue();
91+
}
8692
}
8793

8894
[Fact]
89-
public async Task Response_MemoryStreamShouldBeDisposed()
95+
public async Task WhenNoContent_MemoryStreamShouldBeDisposed()
9096
{
9197
var nodePool = new SingleNodePool(Server.Uri);
9298
var memoryStreamFactory = new TrackMemoryStreamFactory();
@@ -95,11 +101,37 @@ public async Task Response_MemoryStreamShouldBeDisposed()
95101

96102
var transport = new DistributedTransport(config);
97103

98-
_ = await transport.PostAsync<TestResponse>(Path, PostData.String("{}"));
104+
var payload = new Payload { ResponseJsonString = "", StatusCode = 204 };
105+
_ = await transport.PostAsync<TestResponse>(Path, PostData.Serializable(payload));
99106

100-
var memoryStream = memoryStreamFactory.Created.Last();
107+
// We expect one for sending the request payload, but as the response is 204, we shouldn't
108+
// see other memory streams being created for the response.
109+
memoryStreamFactory.Created.Count.Should().Be(1);
110+
foreach (var memoryStream in memoryStreamFactory.Created)
111+
{
112+
memoryStream.IsDisposed.Should().BeTrue();
113+
}
114+
}
115+
116+
[Fact]
117+
public async Task PlainText_MemoryStreamShouldBeDisposed()
118+
{
119+
var nodePool = new SingleNodePool(Server.Uri);
120+
var memoryStreamFactory = new TrackMemoryStreamFactory();
121+
var config = new TransportConfiguration(nodePool, productRegistration: new ElasticsearchProductRegistration(typeof(Clients.Elasticsearch.ElasticsearchClient)))
122+
.MemoryStreamFactory(memoryStreamFactory)
123+
.DisableDirectStreaming(true);
124+
125+
var transport = new DistributedTransport(config);
126+
127+
var payload = new Payload { ResponseJsonString = "text", ContentType = "text/plain" };
128+
_ = await transport.PostAsync<TestResponse>(Path, PostData.Serializable(payload));
101129

102-
memoryStream.IsDisposed.Should().BeTrue();
130+
memoryStreamFactory.Created.Count.Should().Be(3);
131+
foreach (var memoryStream in memoryStreamFactory.Created)
132+
{
133+
memoryStream.IsDisposed.Should().BeTrue();
134+
}
103135
}
104136

105137
private class TestResponse : TransportResponse
@@ -150,9 +182,27 @@ public override MemoryStream Create(byte[] bytes, int index, int count)
150182
}
151183
}
152184

185+
public class Payload
186+
{
187+
public string ResponseJsonString { get; set; } = "{}";
188+
public string ContentType { get; set; } = "application/json";
189+
public int StatusCode { get; set; } = 200;
190+
}
191+
153192
[ApiController, Route("[controller]")]
154193
public class StreamResponseController : ControllerBase
155194
{
156195
[HttpPost]
157-
public Task<JsonElement> Post([FromBody] JsonElement body) => Task.FromResult(body);
196+
public async Task<ActionResult> Post([FromBody] Payload payload)
197+
{
198+
Response.ContentType = payload.ContentType;
199+
200+
if (payload.StatusCode != 204)
201+
{
202+
await Response.BodyWriter.WriteAsync(Encoding.UTF8.GetBytes(payload.ResponseJsonString));
203+
await Response.BodyWriter.CompleteAsync();
204+
}
205+
206+
return StatusCode(payload.StatusCode);
207+
}
158208
}

tests/Elastic.Transport.Tests/ResponseBuilderDisposeTests.cs

Lines changed: 18 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -29,66 +29,34 @@ public async Task StreamResponseWithPotentialBodyAndDisableDirectStreaming_Memor
2929
await AssertResponse<StreamResponse>(true, expectedDisposed: false);
3030

3131
[Fact]
32-
public async Task StreamResponseWith204StatusCode_MemoryStreamIsDisposed() =>
33-
await AssertResponse<StreamResponse>(true, 204);
32+
public async Task ResponseWithPotentialBodyButInvalidMimeType_MemoryStreamIsDisposed() =>
33+
await AssertResponse<TestResponse>(true, mimeType: "application/not-valid", expectedDisposed: true);
3434

3535
[Fact]
36-
public async Task StreamResponseForHeadRequest_StreamIsDisposed() =>
37-
await AssertResponse<StreamResponse>(false, httpMethod: HttpMethod.HEAD);
36+
public async Task ResponseWithPotentialBodyButSkippedStatusCode_MemoryStreamIsDisposed() =>
37+
await AssertResponse<TestResponse>(true, skipStatusCode: 200, expectedDisposed: true);
3838

3939
[Fact]
40-
public async Task StreamResponseWithZeroContentLength_StreamIsDisposed() =>
41-
await AssertResponse<StreamResponse>(false, contentLength: 0);
42-
43-
[Fact]
44-
public async Task ResponseWithPotentialBody_StreamIsDisposed() =>
45-
await AssertResponse<TestResponse>(false, expectedDisposed: true);
46-
47-
[Fact]
48-
public async Task ResponseWithPotentialBodyButInvalidMimeType_StreamIsDisposed() =>
49-
await AssertResponse<TestResponse>(false, mimeType: "application/not-valid", expectedDisposed: true);
50-
51-
[Fact]
52-
public async Task ResponseWithPotentialBodyButSkippedStatusCode_StreamIsDisposed() =>
53-
await AssertResponse<TestResponse>(false, skipStatusCode: 200, expectedDisposed: true);
54-
55-
[Fact]
56-
public async Task ResponseWithPotentialBodyButEmptyJson_StreamIsDisposed() =>
57-
await AssertResponse<TestResponse>(false, responseJson: " ", expectedDisposed: true);
40+
public async Task ResponseWithPotentialBodyButEmptyJson_MemoryStreamIsDisposed() =>
41+
await AssertResponse<TestResponse>(true, responseJson: " ", expectedDisposed: true);
5842

5943
[Fact]
6044
// NOTE: The empty string here hits a fast path in STJ which returns default if the stream length is zero.
61-
public async Task ResponseWithPotentialBodyButNullResponseDuringDeserialization_StreamIsDisposed() =>
62-
await AssertResponse<TestResponse>(false, responseJson: "", expectedDisposed: true);
45+
public async Task ResponseWithPotentialBodyButNullResponseDuringDeserialization_MemoryStreamIsDisposed() =>
46+
await AssertResponse<TestResponse>(true, responseJson: "", expectedDisposed: true);
6347

6448
[Fact]
65-
public async Task ResponseWithPotentialBodyAndCustomResponseBuilder_StreamIsDisposed() =>
66-
await AssertResponse<TestResponse>(false, customResponseBuilder: new TestCustomResponseBuilder(), expectedDisposed: true);
49+
public async Task ResponseWithPotentialBodyAndCustomResponseBuilder_MemoryStreamIsDisposed() =>
50+
await AssertResponse<TestResponse>(true, customResponseBuilder: new TestCustomResponseBuilder(), expectedDisposed: true);
6751

6852
[Fact]
6953
// NOTE: We expect one memory stream factory creation when handling error responses
7054
public async Task ResponseWithPotentialBodyAndErrorResponse_StreamIsDisposed() =>
71-
await AssertResponse<TestResponse>(false, productRegistration: new TestProductRegistration(), expectedDisposed: true, memoryStreamCreateExpected: 1);
72-
73-
[Fact]
74-
public async Task ResponseWithPotentialBodyAndDisableDirectStreaming_MemoryStreamIsDisposed() =>
75-
await AssertResponse<TestResponse>(true, expectedDisposed: true);
76-
77-
[Fact]
78-
public async Task ResponseWith204StatusCode_StreamIsDisposed() =>
79-
await AssertResponse<TestResponse>(false, 204);
80-
81-
[Fact]
82-
public async Task ResponseForHeadRequest_StreamIsDisposed() =>
83-
await AssertResponse<TestResponse>(false, httpMethod: HttpMethod.HEAD);
84-
85-
[Fact]
86-
public async Task ResponseWithZeroContentLength_StreamIsDisposed() =>
87-
await AssertResponse<TestResponse>(false, contentLength: 0);
55+
await AssertResponse<TestResponse>(true, productRegistration: new TestProductRegistration(), expectedDisposed: true);
8856

8957
[Fact]
9058
public async Task StringResponseWithPotentialBodyAndDisableDirectStreaming_MemoryStreamIsDisposed() =>
91-
await AssertResponse<StringResponse>(true, expectedDisposed: true, memoryStreamCreateExpected: 1);
59+
await AssertResponse<StringResponse>(false, expectedDisposed: true, memoryStreamCreateExpected: 1);
9260

9361
private async Task AssertResponse<T>(bool disableDirectStreaming, int statusCode = 200, HttpMethod httpMethod = HttpMethod.GET, int contentLength = 10,
9462
bool expectedDisposed = true, string mimeType = "application/json", string responseJson = "{}", int skipStatusCode = -1,
@@ -139,13 +107,11 @@ private async Task AssertResponse<T>(bool disableDirectStreaming, int statusCode
139107
if (disableDirectStreaming)
140108
{
141109
var memoryStream = memoryStreamFactory.Created[0];
142-
stream.IsDisposed.Should().BeTrue();
143110
memoryStream.IsDisposed.Should().Be(expectedDisposed);
144111
}
145-
else
146-
{
147-
stream.IsDisposed.Should().Be(expectedDisposed);
148-
}
112+
113+
// The latest implementation should never dispose the incoming stream and assumes the caller will handler disposal
114+
stream.IsDisposed.Should().Be(false);
149115

150116
stream = new TrackDisposeStream();
151117
var ct = new CancellationToken();
@@ -159,13 +125,11 @@ private async Task AssertResponse<T>(bool disableDirectStreaming, int statusCode
159125
if (disableDirectStreaming)
160126
{
161127
var memoryStream = memoryStreamFactory.Created[0];
162-
stream.IsDisposed.Should().BeTrue();
163128
memoryStream.IsDisposed.Should().Be(expectedDisposed);
164129
}
165-
else
166-
{
167-
stream.IsDisposed.Should().Be(expectedDisposed);
168-
}
130+
131+
// The latest implementation should never dispose the incoming stream and assumes the caller will handler disposal
132+
stream.IsDisposed.Should().Be(false);
169133
}
170134

171135
private class TestProductRegistration : ProductRegistration

0 commit comments

Comments
 (0)