Skip to content

Commit 75f74da

Browse files
authored
fix: SendAsync break FlushAsync when RentConnection throw exception
When `_connectionPool.RentConnection() ` throw exception The `publisher.PublishAsync` will throw exception,
1 parent 6b61df5 commit 75f74da

File tree

1 file changed

+29
-22
lines changed

1 file changed

+29
-22
lines changed

src/DotNetCore.CAP.NATS/ITransport.NATS.cs

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,40 +30,47 @@ public NATSTransport(ILogger<NATSTransport> logger, IConnectionPool connectionPo
3030

3131
public async Task<OperateResult> SendAsync(TransportMessage message)
3232
{
33-
var connection = _connectionPool.RentConnection();
34-
3533
try
3634
{
37-
var msg = new Msg(message.GetName(), message.Body.ToArray());
38-
foreach (var header in message.Headers)
35+
var connection = _connectionPool.RentConnection();
36+
try
3937
{
40-
msg.Header[header.Key] = header.Value;
41-
}
38+
var msg = new Msg(message.GetName(), message.Body.ToArray());
39+
foreach (var header in message.Headers)
40+
{
41+
msg.Header[header.Key] = header.Value;
42+
}
4243

43-
var js = connection.CreateJetStreamContext(_jetStreamOptions);
44+
var js = connection.CreateJetStreamContext(_jetStreamOptions);
4445

45-
var builder = PublishOptions.Builder().WithMessageId(message.GetId());
46+
var builder = PublishOptions.Builder().WithMessageId(message.GetId());
4647

47-
var resp = await js.PublishAsync(msg, builder.Build());
48+
var resp = await js.PublishAsync(msg, builder.Build());
4849

49-
if (resp.Seq > 0)
50-
{
51-
_logger.LogDebug($"NATS stream message [{message.GetName()}] has been published.");
50+
if (resp.Seq > 0)
51+
{
52+
_logger.LogDebug($"NATS stream message [{message.GetName()}] has been published.");
5253

53-
return OperateResult.Success;
54+
return OperateResult.Success;
55+
}
56+
57+
throw new PublisherSentFailedException("NATS message send failed, no consumer reply!");
5458
}
59+
catch (Exception ex)
60+
{
61+
var warpEx = new PublisherSentFailedException(ex.Message, ex);
5562

56-
throw new PublisherSentFailedException("NATS message send failed, no consumer reply!");
63+
return OperateResult.Failed(warpEx);
64+
}
65+
finally
66+
{
67+
_connectionPool.Return(connection);
68+
}
5769
}
58-
catch (Exception ex)
70+
catch (Exception e)
5971
{
60-
var warpEx = new PublisherSentFailedException(ex.Message, ex);
61-
72+
var warpEx = new PublisherSentFailedException(e.Message, e);
6273
return OperateResult.Failed(warpEx);
6374
}
64-
finally
65-
{
66-
_connectionPool.Return(connection);
67-
}
6875
}
69-
}
76+
}

0 commit comments

Comments
 (0)