Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unobserved exceptions with retries #2255

Merged
merged 12 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Grpc.Net.Client/Internal/GrpcCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ private async Task RunCall(HttpRequestMessage request, TimeSpan? timeout)
if (_responseTcs != null)
{
_responseTcs.TrySetException(resolvedException);

// Always observe cancellation-like exceptions.
if (IsCancellationOrDeadlineException(ex))
{
Expand Down
21 changes: 20 additions & 1 deletion src/Grpc.Net.Client/Internal/HttpClientCallInvoker.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
Expand Down Expand Up @@ -43,6 +43,8 @@ public HttpClientCallInvoker(GrpcChannel channel)
/// </summary>
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options)
{
AssertMethodType(method, MethodType.ClientStreaming);

var call = CreateRootGrpcCall<TRequest, TResponse>(Channel, method, options);
call.StartClientStreaming();

Expand All @@ -67,6 +69,8 @@ public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreami
/// </summary>
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options)
{
AssertMethodType(method, MethodType.DuplexStreaming);

var call = CreateRootGrpcCall<TRequest, TResponse>(Channel, method, options);
call.StartDuplexStreaming();

Expand All @@ -90,6 +94,8 @@ public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreami
/// </summary>
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
{
AssertMethodType(method, MethodType.ServerStreaming);

var call = CreateRootGrpcCall<TRequest, TResponse>(Channel, method, options);
call.StartServerStreaming(request);

Expand All @@ -111,6 +117,8 @@ public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRe
/// </summary>
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
{
AssertMethodType(method, MethodType.Unary);

var call = CreateRootGrpcCall<TRequest, TResponse>(Channel, method, options);
call.StartUnary(request);

Expand All @@ -127,6 +135,17 @@ public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Me
return callWrapper;
}

private static void AssertMethodType(IMethod method, MethodType methodType)
{
JamesNK marked this conversation as resolved.
Show resolved Hide resolved
// This can be used to assert tests are passing the right method type.
#if ASSERT_METHOD_TYPE
if (method.Type != methodType)
{
throw new Exception("Expected method type: " + methodType);
}
#endif
}

/// <summary>
/// Invokes a simple remote call in a blocking fashion.
/// </summary>
Expand Down
13 changes: 11 additions & 2 deletions src/Grpc.Net.Client/Internal/Retry/HedgingCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,19 @@ private async Task StartCall(Action<GrpcCall<TRequest, TResponse>> startCallFunc
{
if (CommitedCallTask.IsCompletedSuccessfully() && CommitedCallTask.Result == call)
{
// Ensure response task is created before waiting to the end.
// Allows cancellation exceptions to be observed in cleanup.
if (!HasResponseStream())
{
_ = GetResponseAsync();
}

// Wait until the commited call is finished and then clean up hedging call.
// Force yield here to prevent continuation running with any locks.
await CompatibilityHelpers.AwaitWithYieldAsync(call.CallTask).ConfigureAwait(false);
Cleanup();
var status = await CompatibilityHelpers.AwaitWithYieldAsync(call.CallTask).ConfigureAwait(false);

var observeExceptions = status.StatusCode is StatusCode.Cancelled or StatusCode.DeadlineExceeded;
Cleanup(observeExceptions);
}
}
}
Expand Down
19 changes: 14 additions & 5 deletions src/Grpc.Net.Client/Internal/Retry/RetryCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private async Task StartRetry(Action<GrpcCall<TRequest, TResponse>> startCallFun
// Commited so exit retry loop.
return;
}
else if (IsSuccessfulStreamingCall(responseStatus.Value, currentCall))
else if (IsSuccessfulStreamingCall(responseStatus.Value))
{
// Headers were returned. We're commited.
CommitCall(currentCall, CommitReason.ResponseHeadersReceived);
Expand Down Expand Up @@ -252,25 +252,34 @@ private async Task StartRetry(Action<GrpcCall<TRequest, TResponse>> startCallFun
{
if (CommitedCallTask.Result is GrpcCall<TRequest, TResponse> call)
{
// Ensure response task is created before waiting to the end.
// Allows cancellation exceptions to be observed in cleanup.
if (!HasResponseStream())
{
_ = GetResponseAsync();
}

// Wait until the commited call is finished and then clean up retry call.
// Force yield here to prevent continuation running with any locks.
await CompatibilityHelpers.AwaitWithYieldAsync(call.CallTask).ConfigureAwait(false);
Cleanup();
var status = await CompatibilityHelpers.AwaitWithYieldAsync(call.CallTask).ConfigureAwait(false);

var observeExceptions = status.StatusCode is StatusCode.Cancelled or StatusCode.DeadlineExceeded;
Cleanup(observeExceptions);
}
}

Log.StoppingRetryWorker(Logger);
}
}

private static bool IsSuccessfulStreamingCall(Status responseStatus, GrpcCall<TRequest, TResponse> call)
private bool IsSuccessfulStreamingCall(Status responseStatus)
{
if (responseStatus.StatusCode != StatusCode.OK)
{
return false;
}

return call.Method.Type == MethodType.ServerStreaming || call.Method.Type == MethodType.DuplexStreaming;
return HasResponseStream();
}

protected override void OnCommitCall(IGrpcCall<TRequest, TResponse> call)
Expand Down
44 changes: 39 additions & 5 deletions src/Grpc.Net.Client/Internal/Retry/RetryCallBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ internal abstract partial class RetryCallBase<TRequest, TResponse> : IGrpcCall<T
private readonly TaskCompletionSource<IGrpcCall<TRequest, TResponse>> _commitedCallTcs;
private RetryCallBaseClientStreamReader<TRequest, TResponse>? _retryBaseClientStreamReader;
private RetryCallBaseClientStreamWriter<TRequest, TResponse>? _retryBaseClientStreamWriter;
private Task<TResponse>? _responseTask;
private Task<Metadata>? _responseHeadersTask;

// Internal for unit testing.
internal CancellationTokenRegistration? _ctsRegistration;
Expand Down Expand Up @@ -111,13 +113,34 @@ protected RetryCallBase(GrpcChannel channel, Method<TRequest, TResponse> method,
}
}

public async Task<TResponse> GetResponseAsync()
public Task<TResponse> GetResponseAsync() => _responseTask ??= GetResponseCoreAsync();

private async Task<TResponse> GetResponseCoreAsync()
{
var call = await CommitedCallTask.ConfigureAwait(false);
return await call.GetResponseAsync().ConfigureAwait(false);
}

public async Task<Metadata> GetResponseHeadersAsync()
public Task<Metadata> GetResponseHeadersAsync()
{
if (_responseHeadersTask == null)
{
_responseHeadersTask = GetResponseHeadersCoreAsync();

// ResponseHeadersAsync could be called inside a client interceptor when a call is wrapped.
// Most people won't use the headers result. Observed exception to avoid unobserved exception event.
_responseHeadersTask.ObserveException();

// If there was an error fetching response headers then it's likely the same error is reported
// by response TCS. The user is unlikely to observe both errors.
// Observed exception to avoid unobserved exception event.
_responseTask?.ObserveException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment makes it seem like the desirability of this behavior depends on the outcome of the header task, but that's not obviously reflected in the code. Will this observe/suppress the response exception if there's no header exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this observe/suppress the response exception if there's no header exception?

Potentially. No error getting headers should mean there isn't an error getting the body. Observing like this is a pragmatic balance.

}

return _responseHeadersTask;
}

private async Task<Metadata> GetResponseHeadersCoreAsync()
{
var call = await CommitedCallTask.ConfigureAwait(false);
return await call.GetResponseHeadersAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -369,7 +392,7 @@ protected void CommitCall(IGrpcCall<TRequest, TResponse> call, CommitReason comm
// A commited call that has already cleaned up is likely a StatusGrpcCall.
if (call.Disposed)
{
Cleanup();
Cleanup(observeExceptions: false);
}
}
}
Expand All @@ -382,6 +405,11 @@ protected bool HasClientStream()
return Method.Type == MethodType.ClientStreaming || Method.Type == MethodType.DuplexStreaming;
}

protected bool HasResponseStream()
{
return Method.Type == MethodType.ServerStreaming || Method.Type == MethodType.DuplexStreaming;
}

protected void SetNewActiveCallUnsynchronized(IGrpcCall<TRequest, TResponse> call)
{
Debug.Assert(Monitor.IsEntered(Lock), "Should be called with lock.");
Expand Down Expand Up @@ -436,11 +464,11 @@ protected virtual void Dispose(bool disposing)
CommitedCallTask.Result.Dispose();
}

Cleanup();
Cleanup(observeExceptions: true);
}
}

protected void Cleanup()
protected void Cleanup(bool observeExceptions)
{
Channel.FinishActiveCall(this);

Expand All @@ -449,6 +477,12 @@ protected void Cleanup()
CancellationTokenSource.Cancel();

ClearRetryBuffer();

if (observeExceptions)
{
_responseTask?.ObserveException();
_responseHeadersTask?.ObserveException();
}
}

internal bool TryAddToRetryBuffer(ReadOnlyMemory<byte> message)
Expand Down
2 changes: 1 addition & 1 deletion src/Grpc.Net.Client/Internal/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
Expand Down
24 changes: 12 additions & 12 deletions test/Grpc.Net.Client.Tests/AsyncClientStreamingCallTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async Task AsyncClientStreamingCall_Success_HttpRequestMessagePopulated()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();

await call.RequestStream.CompleteAsync().DefaultTimeout();

Expand Down Expand Up @@ -98,7 +98,7 @@ public async Task AsyncClientStreamingCall_Success_RequestContentSent()
var invoker = HttpClientCallInvokerFactory.Create(handler, "http://localhost");

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();
var requestContentTask = await requestContentTcs.Task.DefaultTimeout();

// Assert
Expand Down Expand Up @@ -149,7 +149,7 @@ public async Task ClientStreamWriter_WriteWhilePendingWrite_ErrorThrown()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();

// Assert
var writeTask1 = call.RequestStream.WriteAsync(new HelloRequest { Name = "1" });
Expand Down Expand Up @@ -178,7 +178,7 @@ public async Task ClientStreamWriter_DisposeWhilePendingWrite_NoReadMessageError
var invoker = HttpClientCallInvokerFactory.Create(httpClient, loggerFactory: loggerFactory);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();

// Assert
var writeTask1 = call.RequestStream.WriteAsync(new HelloRequest { Name = "1" });
Expand Down Expand Up @@ -216,7 +216,7 @@ public async Task ClientStreamWriter_CompleteWhilePendingWrite_ErrorThrown()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();

// Assert
var writeTask1 = call.RequestStream.WriteAsync(new HelloRequest { Name = "1" });
Expand All @@ -240,7 +240,7 @@ public async Task ClientStreamWriter_WriteWhileComplete_ErrorThrown()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();
await call.RequestStream.CompleteAsync().DefaultTimeout();
var resultTask = call.ResponseAsync;

Expand Down Expand Up @@ -273,7 +273,7 @@ public async Task ClientStreamWriter_WriteWithInvalidHttpStatus_ErrorThrown()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();
var writeException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new HelloRequest { Name = "1" })).DefaultTimeout();
var resultException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseAsync).DefaultTimeout();

Expand All @@ -299,7 +299,7 @@ public async Task ClientStreamWriter_WriteAfterResponseHasFinished_ErrorThrown()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();
var ex = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new HelloRequest())).DefaultTimeout();
var result = await call.ResponseAsync.DefaultTimeout();

Expand Down Expand Up @@ -329,7 +329,7 @@ public async Task AsyncClientStreamingCall_ErrorWhileWriting_StatusExceptionThro
// Act

// Client starts call
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();
// Client starts request stream write
var writeTask = call.RequestStream.WriteAsync(new HelloRequest());

Expand Down Expand Up @@ -422,7 +422,7 @@ public async Task ClientStreamWriter_CancelledBeforeCallStarts_ThrowsError()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(cancellationToken: new CancellationToken(true)));
var call = invoker.AsyncClientStreamingCall(new CallOptions(cancellationToken: new CancellationToken(true)));

var ex = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new HelloRequest())).DefaultTimeout();

Expand All @@ -442,7 +442,7 @@ public async Task ClientStreamWriter_CancelledBeforeCallStarts_ThrowOperationCan
var invoker = HttpClientCallInvokerFactory.Create(httpClient, configure: o => o.ThrowOperationCanceledOnCancellation = true);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(cancellationToken: new CancellationToken(true)));
var call = invoker.AsyncClientStreamingCall(new CallOptions(cancellationToken: new CancellationToken(true)));

await ExceptionAssert.ThrowsAsync<OperationCanceledException>(() => call.RequestStream.WriteAsync(new HelloRequest())).DefaultTimeout();

Expand All @@ -461,7 +461,7 @@ public async Task ClientStreamWriter_CallThrowsException_WriteAsyncThrowsError()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncClientStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncClientStreamingCall();
var writeException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.RequestStream.WriteAsync(new HelloRequest())).DefaultTimeout();
var resultException = await ExceptionAssert.ThrowsAsync<RpcException>(() => call.ResponseAsync).DefaultTimeout();

Expand Down
8 changes: 4 additions & 4 deletions test/Grpc.Net.Client.Tests/AsyncDuplexStreamingCallTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public async Task AsyncDuplexStreamingCall_NoContent_NoMessagesReturned()
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncDuplexStreamingCall();

var responseStream = call.ResponseStream;

Expand All @@ -76,7 +76,7 @@ public async Task AsyncServerStreamingCall_MessagesReturnedTogether_MessagesRece
var invoker = HttpClientCallInvokerFactory.Create(httpClient);

// Act
var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncDuplexStreamingCall();

var responseStream = call.ResponseStream;

Expand Down Expand Up @@ -109,7 +109,7 @@ public async Task AsyncDuplexStreamingCall_MessagesStreamed_MessagesReceived()
var invoker = HttpClientCallInvokerFactory.Create(handler, "https://localhost");

// Act
var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions());
var call = invoker.AsyncDuplexStreamingCall();

var requestStream = call.RequestStream;
var responseStream = call.ResponseStream;
Expand Down Expand Up @@ -216,7 +216,7 @@ public async Task AsyncDuplexStreamingCall_CancellationDisposeRace_Success()

var cts = new CancellationTokenSource();

var call = invoker.AsyncDuplexStreamingCall<HelloRequest, HelloReply>(ClientTestHelpers.ServiceMethod, string.Empty, new CallOptions(cancellationToken: cts.Token));
var call = invoker.AsyncDuplexStreamingCall(new CallOptions(cancellationToken: cts.Token));
await call.RequestStream.WriteAsync(new HelloRequest { Name = "1" }).DefaultTimeout();
await call.RequestStream.CompleteAsync().DefaultTimeout();

Expand Down
Loading
Loading