Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 90 additions & 42 deletions Directory.Build.targets

Large diffs are not rendered by default.

155 changes: 148 additions & 7 deletions src/Lucene.Net.Replicator/Http/HttpClientBase.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Lucene.Net.Diagnostics;
using Lucene.Net.Support;
using System;
using System.IO;
Expand All @@ -7,6 +6,9 @@
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
// ReSharper disable VirtualMemberNeverOverridden.Global
// ReSharper disable MemberCanBePrivate.Global

#nullable enable

namespace Lucene.Net.Replicator.Http
Expand Down Expand Up @@ -173,20 +175,40 @@ protected virtual void ThrowKnownError(HttpResponseMessage response)
}

/// <summary>
/// <b>Internal:</b> Execute a request and return its result.
/// <b>Internal:</b> Execute a POST request with custom HttpContent and return its result.
/// The <paramref name="parameters"/> argument is treated as: name1,value1,name2,value2,...
/// </summary>
protected virtual HttpResponseMessage ExecutePost(string request, HttpContent content, params string[]? parameters)
{
EnsureOpen();

var req = new HttpRequestMessage(HttpMethod.Post, QueryString(request, parameters));

req.Content = content;
var req = new HttpRequestMessage(HttpMethod.Post, QueryString(request, parameters))
{
Content = content
};

return Execute(req);
}

/// <summary>
/// <b>Internal:</b> Execute a POST request asynchronously with custom HttpContent.
/// The <paramref name="parameters"/> argument is treated as: name1,value1,name2,value2,...
/// </summary>
protected virtual async Task<HttpResponseMessage> ExecutePostAsync(string request, HttpContent content, params string[]? parameters)
{
EnsureOpen();

var req = new HttpRequestMessage(HttpMethod.Post, QueryString(request, parameters))
{
Content = content
};

var resp = await httpc.SendAsync(req).ConfigureAwait(false); // Async call
VerifyStatus(resp);
return resp;
}


/// <summary>
/// <b>Internal:</b> Execute a request and return its result.
/// The <paramref name="parameters"/> argument is treated as: name1,value1,name2,value2,...
Expand All @@ -200,6 +222,36 @@ protected virtual HttpResponseMessage ExecuteGet(string request, params string[]
return Execute(req);
}

/// <summary>
/// Execute a GET request asynchronously with an array of parameters.
/// </summary>
protected Task<HttpResponseMessage> ExecuteGetAsync(string action, string[]? parameters, CancellationToken cancellationToken)
{
EnsureOpen();
var url = QueryString(action, parameters);
return httpc.GetAsync(url, cancellationToken);
}

/// <summary>
/// Execute a GET request asynchronously with up to 3 name/value parameters.
/// </summary>
protected Task<HttpResponseMessage> ExecuteGetAsync(
string action,
string param1, string value1,
string? param2 = null, string? value2 = null,
string? param3 = null, string? value3 = null,
CancellationToken cancellationToken = default)
{
EnsureOpen();
var url = (param2 == null && param3 == null)
? QueryString(action, param1, value1)
: QueryString(action,
param1, value1,
param2 ?? string.Empty, value2 ?? string.Empty,
param3 ?? string.Empty, value3 ?? string.Empty);
return httpc.GetAsync(url, cancellationToken);
}

private HttpResponseMessage Execute(HttpRequestMessage request)
{
//.NET Note: Bridging from Async to Sync, this is not ideal and we could consider changing the interface to be Async or provide Async overloads
Expand Down Expand Up @@ -255,7 +307,11 @@ public virtual Stream GetResponseStream(HttpResponseMessage response) // LUCENEN
/// <exception cref="IOException"></exception>
public virtual Stream GetResponseStream(HttpResponseMessage response, bool consume) // LUCENENET: This was ResponseInputStream in Lucene
{
#if FEATURE_HTTPCONTENT_READASSTREAM
Stream result = response.Content.ReadAsStream();
#else
Stream result = response.Content.ReadAsStreamAsync().ConfigureAwait(false).GetAwaiter().GetResult();
#endif

if (consume)
{
Expand All @@ -265,6 +321,37 @@ public virtual Stream GetResponseStream(HttpResponseMessage response, bool consu
return result;
}

/// <summary>
/// Internal utility: input stream of the provided response asynchronously.
/// </summary>
/// <exception cref="IOException"></exception>
public virtual async Task<Stream> GetResponseStreamAsync(HttpResponseMessage response, CancellationToken cancellationToken = default)
{
#if FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN
Stream result = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
#else
Stream result = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
#endif
return result;
}

/// <summary>
/// Internal utility: input stream of the provided response asynchronously, which optionally
/// consumes the response's resources when the input stream is exhausted.
/// </summary>
/// <exception cref="IOException"></exception>
public virtual async Task<Stream> GetResponseStreamAsync(HttpResponseMessage response, bool consume, CancellationToken cancellationToken = default)
{
#if FEATURE_HTTPCONTENT_READASSTREAM_CANCELLATIONTOKEN
Stream result = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
#else
Stream result = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
#endif
if (consume)
result = new ConsumingStream(result);
return result;
}

/// <summary>
/// Returns <c>true</c> if this instance was <see cref="Dispose(bool)"/>ed, otherwise
/// returns <c>false</c>. Note that if you override <see cref="Dispose(bool)"/>, you must call
Expand Down Expand Up @@ -310,11 +397,65 @@ protected virtual T DoAction<T>(HttpResponseMessage response, bool consume, Func
}
}
}
if (Debugging.AssertsEnabled) Debugging.Assert(th != null); // extra safety - if we get here, it means the Func<T> failed

// if (Debugging.AssertsEnabled) Debugging.Assert(th != null); // LUCENENET: Removed assertion because it'll never be null here, ensured by NRT
Util.IOUtils.ReThrow(th);
return default!; // silly, if we're here, IOUtils.reThrow always throws an exception
}

/// <summary>
/// Do a specific async action and validate after the action that the status is still OK,
/// and if not, attempt to extract the actual server side exception. Optionally
/// release the response at exit, depending on <paramref name="consume"/> parameter.
/// </summary>
protected virtual async Task<T> DoActionAsync<T>(HttpResponseMessage response, bool consume, Func<Task<T>> call)
{
Exception? th /* = null */;
try
{
VerifyStatus(response);
return await call().ConfigureAwait(false);
}
catch (Exception t) when (t.IsThrowable())
{
th = t;
}
finally
{
try
{
VerifyStatus(response);
}
finally
{
if (consume)
{
try
{
ConsumeQuietly(response);
}
catch
{
// ignore on purpose
}
}
}
}

// if (Debugging.AssertsEnabled) Debugging.Assert(th != null); // LUCENENET: Removed assertion because it'll never be null here, ensured by NRT
Util.IOUtils.ReThrow(th);
return default!; // never reached, rethrow above always throws
}

/// <summary>
/// Calls the overload <see cref="DoActionAsync{T}(HttpResponseMessage, bool, Func{Task{T}})"/> passing <c>true</c> to consume.
/// </summary>
protected virtual Task<T> DoActionAsync<T>(HttpResponseMessage response, Func<Task<T>> call)
{
return DoActionAsync(response, true, call);
}


/// <summary>
/// Disposes this <see cref="HttpClientBase"/>.
/// When called with <code>true</code>, this disposes the underlying <see cref="HttpClient"/>.
Expand Down Expand Up @@ -357,7 +498,7 @@ private static void ConsumeQuietly(HttpResponseMessage response)
private class ConsumingStream : Stream
{
private readonly Stream input;
private bool consumed = false;
private bool consumed /* = false */;

public ConsumingStream(Stream input)
{
Expand Down
104 changes: 100 additions & 4 deletions src/Lucene.Net.Replicator/Http/HttpReplicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
using System;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

#nullable enable

namespace Lucene.Net.Replicator.Http
Expand Down Expand Up @@ -29,14 +32,14 @@ namespace Lucene.Net.Replicator.Http
/// <remarks>
/// @lucene.experimental
/// </remarks>
public class HttpReplicator : HttpClientBase, IReplicator
public class HttpReplicator : HttpClientBase, IReplicator, IAsyncReplicator
{
/// <summary>
/// Creates a new <see cref="HttpReplicator"/> with the given host, port and path.
/// <see cref="HttpClientBase(string, int, string, HttpMessageHandler)"/> for more details.
/// </summary>
public HttpReplicator(string host, int port, string path, HttpMessageHandler? messageHandler = null)
: base(host, port, path, messageHandler)
: base(host, port, path, messageHandler ?? new HttpClientHandler())
{
}

Expand Down Expand Up @@ -66,12 +69,14 @@ public HttpReplicator(string url, HttpClient client)
public virtual SessionToken? CheckForUpdate(string? currentVersion)
{
string[]? parameters = null;
if (currentVersion != null)

if (!string.IsNullOrEmpty(currentVersion))
{
parameters = new[] { ReplicationService.REPLICATE_VERSION_PARAM, currentVersion };
parameters = new[] { ReplicationService.REPLICATE_VERSION_PARAM, currentVersion! }; // [!]: verified above
}

var response = base.ExecuteGet(nameof(ReplicationService.ReplicationAction.UPDATE), parameters);

return DoAction(response, () =>
{
using var inputStream = new DataInputStream(GetResponseStream(response));
Expand Down Expand Up @@ -109,5 +114,96 @@ public virtual void Release(string sessionId)
// do not remove this call: as it is still validating for us!
DoAction<object?>(response, () => null);
}

#region Async methods (IAsyncReplicator)

/// <summary>
/// Checks for updates at the remote host asynchronously.
/// </summary>
/// <param name="currentVersion">The current index version.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>
/// A <see cref="SessionToken"/> if updates are available; otherwise, <c>null</c>.
/// </returns>
public async Task<SessionToken?> CheckForUpdateAsync(string? currentVersion, CancellationToken cancellationToken = default)
{
string[]? parameters = !string.IsNullOrEmpty(currentVersion)
? new[] { ReplicationService.REPLICATE_VERSION_PARAM, currentVersion! } // [!]: verified above
: null;

using var response = await ExecuteGetAsync(
nameof(ReplicationService.ReplicationAction.UPDATE),
parameters,
cancellationToken: cancellationToken).ConfigureAwait(false);

return await DoActionAsync(response, async () =>
{
// ReSharper disable once AccessToDisposedClosure - DoActionAsync definitively returns after this lambda is invoked
using var inputStream = new DataInputStream(
await GetResponseStreamAsync(response, cancellationToken).ConfigureAwait(false));

return inputStream.ReadByte() == 0 ? null : new SessionToken(inputStream);
}).ConfigureAwait(false);
}

/// <summary>
/// Obtains the given file from the remote host asynchronously.
/// </summary>
/// <param name="sessionId">The session ID.</param>
/// <param name="source">The source of the file.</param>
/// <param name="fileName">The file name.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A <see cref="Stream"/> of the requested file.</returns>
public async Task<Stream> ObtainFileAsync(string sessionId, string source, string fileName, CancellationToken cancellationToken = default)
{
using var response = await ExecuteGetAsync(
nameof(ReplicationService.ReplicationAction.OBTAIN),
ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionId,
ReplicationService.REPLICATE_SOURCE_PARAM, source,
ReplicationService.REPLICATE_FILENAME_PARAM, fileName,
cancellationToken: cancellationToken).ConfigureAwait(false);

return await DoActionAsync(response,
// ReSharper disable once AccessToDisposedClosure - DoActionAsync definitively returns after this lambda is invoked
async () => await GetResponseStreamAsync(response, cancellationToken).ConfigureAwait(false))
.ConfigureAwait(false);
}

/// <summary>
/// Publishes a new <see cref="IRevision"/> asynchronously.
/// Not supported in this implementation.
/// </summary>
/// <param name="revision">The revision to publish.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the operation.</returns>
/// <exception cref="NotSupportedException">Always thrown.</exception>
public Task PublishAsync(IRevision revision, CancellationToken cancellationToken = default)
{
throw UnsupportedOperationException.Create(
"this replicator implementation does not support remote publishing of revisions");
}

/// <summary>
/// Releases the session at the remote host asynchronously.
/// </summary>
/// <param name="sessionId">The session ID to release.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the operation.</returns>
public async Task ReleaseAsync(string sessionId, CancellationToken cancellationToken = default)
{
using var response = await ExecuteGetAsync(
nameof(ReplicationService.ReplicationAction.RELEASE),
ReplicationService.REPLICATE_SESSION_ID_PARAM, sessionId,
cancellationToken: cancellationToken).ConfigureAwait(false);

await DoActionAsync(response, () =>
{
// No actual response content needed — just verification
return Task.FromResult<object?>(null);
}).ConfigureAwait(false);
}

#endregion

}
}
Loading