Skip to content

Commit f8e54a0

Browse files
committed
Add support for response builders on IRequestConfiguration
This allows consumers to provide builders on a per-request basis, required in ingest for streaming bulk response. Fixes potential null ref exceptions when disposing streams. This also pins the SDK to 8.x until we're ready for 9.x
1 parent 1fbc364 commit f8e54a0

File tree

10 files changed

+519
-485
lines changed

10 files changed

+519
-485
lines changed

global.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"sdk": {
33
"version": "8.0.100",
4-
"rollForward": "latestMajor",
4+
"rollForward": "latestMinor",
55
"allowPrerelease": false
66
}
77
}

src/Elastic.Transport/Components/Pipeline/RequestData.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using System.Collections.Specialized;
88
using System.Security.Cryptography.X509Certificates;
99
using Elastic.Transport.Extensions;
10-
using Elastic.Transport.Products;
1110

1211
namespace Elastic.Transport;
1312

@@ -41,7 +40,7 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
4140
ProxyPassword = global.ProxyPassword;
4241
DisableAutomaticProxyDetection = global.DisableAutomaticProxyDetection;
4342
UserAgent = global.UserAgent;
44-
ResponseBuilders = global.ResponseBuilders;
43+
ResponseBuilders = local?.ResponseBuilders ?? global.ResponseBuilders;
4544
ProductResponseBuilders = global.ProductRegistration.ResponseBuilders;
4645

4746
KeepAliveInterval = (int)(global.KeepAliveInterval?.TotalMilliseconds ?? 2000);
@@ -92,11 +91,6 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
9291
}
9392
}
9493

95-
/// <inheritdoc cref="ITransportConfiguration.ResponseBuilders"/>
96-
public IReadOnlyCollection<IResponseBuilder> ProductResponseBuilders { get; }
97-
98-
/// <inheritdoc cref="ITransportConfiguration.ResponseBuilders"/>
99-
public IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
10094

10195
/// <inheritdoc cref="ITransportConfiguration.MemoryStreamFactory"/>
10296
public MemoryStreamFactory MemoryStreamFactory { get; }
@@ -168,4 +162,8 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
168162
public bool DisableSniff { get; }
169163
/// <inheritdoc cref="IRequestConfiguration.DisablePings"/>
170164
public bool DisablePings { get; }
165+
/// <inheritdoc cref="IRequestConfiguration.ResponseBuilders"/>
166+
public IReadOnlyCollection<IResponseBuilder> ProductResponseBuilders { get; }
167+
/// <inheritdoc cref="IRequestConfiguration.ResponseBuilders"/>
168+
public IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
171169
}

src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
199199
}
200200
else
201201
{
202-
responseStream.Dispose();
203-
receivedResponse.Dispose();
202+
responseStream?.Dispose();
203+
receivedResponse?.Dispose();
204204
}
205205

206206
if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
@@ -218,8 +218,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
218218
catch
219219
{
220220
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
221-
responseStream.Dispose();
222-
receivedResponse.Dispose();
221+
responseStream?.Dispose();
222+
receivedResponse?.Dispose();
223223
throw;
224224
}
225225
}

src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
190190
}
191191
else
192192
{
193-
responseStream.Dispose();
194-
receivedResponse.Dispose();
193+
responseStream?.Dispose();
194+
receivedResponse?.Dispose();
195195
}
196196

197197
if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
@@ -208,8 +208,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
208208
catch
209209
{
210210
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
211-
responseStream.Dispose();
212-
receivedResponse.Dispose();
211+
responseStream?.Dispose();
212+
receivedResponse?.Dispose();
213213
throw;
214214
}
215215
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using System.Collections.Specialized;
8+
using System.Security.Cryptography.X509Certificates;
9+
10+
namespace Elastic.Transport;
11+
12+
/// <summary>
13+
/// Allows you to inject per request overrides to the current <see cref="ITransportConfiguration"/>.
14+
/// </summary>
15+
public interface IRequestConfiguration
16+
{
17+
/// <summary>
18+
/// Force a different Accept header on the request
19+
/// </summary>
20+
string? Accept { get; }
21+
22+
/// <summary>
23+
/// Treat the following statuses (on top of the 200 range) NOT as error.
24+
/// </summary>
25+
IReadOnlyCollection<int>? AllowedStatusCodes { get; }
26+
27+
/// <summary> Provide an authentication header override for this request </summary>
28+
AuthorizationHeader? Authentication { get; }
29+
30+
/// <summary>
31+
/// Use the following client certificates to authenticate this single request
32+
/// </summary>
33+
X509CertificateCollection? ClientCertificates { get; }
34+
35+
/// <summary>
36+
/// Force a different Content-Type header on the request
37+
/// </summary>
38+
string? ContentType { get; }
39+
40+
/// <summary>
41+
/// Whether to buffer the request and response bytes for the call
42+
/// </summary>
43+
bool? DisableDirectStreaming { get; }
44+
45+
/// <summary>
46+
/// Whether to disable the audit trail for the request.
47+
/// </summary>
48+
bool? DisableAuditTrail { get; }
49+
50+
/// <summary>
51+
/// Under no circumstance do a ping before the actual call. If a node was previously dead a small ping with
52+
/// low connect timeout will be tried first in normal circumstances
53+
/// </summary>
54+
bool? DisablePings { get; }
55+
56+
/// <summary>
57+
/// Forces no sniffing to occur on the request no matter what configuration is in place
58+
/// globally
59+
/// </summary>
60+
bool? DisableSniff { get; }
61+
62+
/// <summary>
63+
/// Whether or not this request should be pipelined. http://en.wikipedia.org/wiki/HTTP_pipelining defaults to true
64+
/// </summary>
65+
bool? HttpPipeliningEnabled { get; }
66+
67+
/// <summary>
68+
/// Enable gzip compressed requests and responses
69+
/// </summary>
70+
bool? EnableHttpCompression { get; }
71+
72+
/// <summary>
73+
/// This will force the operation on the specified node, this will bypass any configured connection pool and will no retry.
74+
/// </summary>
75+
Uri? ForceNode { get; }
76+
77+
/// <summary>
78+
/// When a retryable exception occurs or status code is returned this controls the maximum
79+
/// amount of times we should retry the call to Elasticsearch
80+
/// </summary>
81+
int? MaxRetries { get; }
82+
83+
/// <summary>
84+
/// Limits the total runtime including retries separately from <see cref="IRequestConfiguration.RequestTimeout" />
85+
/// <pre>
86+
/// When not specified defaults to <see cref="IRequestConfiguration.RequestTimeout" /> which itself defaults to 60 seconds
87+
/// </pre>
88+
/// </summary>
89+
TimeSpan? MaxRetryTimeout { get; }
90+
91+
/// <summary>
92+
/// Associate an Id with this user-initiated task, such that it can be located in the cluster task list.
93+
/// Valid only for Elasticsearch 6.2.0+
94+
/// </summary>
95+
string? OpaqueId { get; }
96+
97+
/// <summary> Determines whether to parse all HTTP headers in the request. </summary>
98+
bool? ParseAllHeaders { get; }
99+
100+
/// <summary>
101+
/// The ping timeout for this specific request
102+
/// </summary>
103+
TimeSpan? PingTimeout { get; }
104+
105+
/// <summary>
106+
/// The timeout for this specific request, takes precedence over the global timeout init
107+
/// </summary>
108+
TimeSpan? RequestTimeout { get; }
109+
110+
/// <summary>
111+
/// Additional response builders to apply.
112+
/// </summary>
113+
IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
114+
115+
/// <summary> Specifies the headers from the response that should be parsed. </summary>
116+
HeadersList? ResponseHeadersToParse { get; }
117+
118+
/// <summary>
119+
/// Submit the request on behalf in the context of a different shield user
120+
/// <pre />https://www.elastic.co/guide/en/shield/current/submitting-requests-for-other-users.html
121+
/// </summary>
122+
string? RunAs { get; }
123+
124+
/// <summary>
125+
/// Instead of following a c/go like error checking on response.IsValid do throw an exception (except when <see cref="ApiCallDetails.SuccessOrKnownError"/> is false)
126+
/// on the client when a call resulted in an exception on either the client or the Elasticsearch server.
127+
/// <para>Reasons for such exceptions could be search parser errors, index missing exceptions, etc...</para>
128+
/// </summary>
129+
bool? ThrowExceptions { get; }
130+
131+
/// <summary>
132+
/// Whether the request should be sent with chunked Transfer-Encoding.
133+
/// </summary>
134+
bool? TransferEncodingChunked { get; }
135+
136+
/// <summary>
137+
/// Try to send these headers for this single request
138+
/// </summary>
139+
NameValueCollection? Headers { get; }
140+
141+
/// <summary>
142+
/// Enable statistics about TCP connections to be collected when making a request
143+
/// </summary>
144+
bool? EnableTcpStats { get; }
145+
146+
/// <summary>
147+
/// Enable statistics about thread pools to be collected when making a request
148+
/// </summary>
149+
bool? EnableThreadPoolStats { get; }
150+
151+
/// <summary>
152+
/// Holds additional meta data about the request.
153+
/// </summary>
154+
RequestMetaData? RequestMetaData { get; }
155+
}

src/Elastic.Transport/Configuration/ITransportConfiguration.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,4 @@ public interface ITransportConfiguration : IRequestConfiguration, IDisposable
210210
/// about the client and runtime.
211211
/// </summary>
212212
bool DisableMetaHeader { get; }
213-
214-
/// <summary>
215-
/// Additional response builders to apply.
216-
/// </summary>
217-
IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
218213
}

0 commit comments

Comments
 (0)