Skip to content

Commit 819c982

Browse files
committed
Adjust the timing for verifying whether CAP is started. After CAP stops, failed messages will be stored in the storage to be retried later upon restart. (#1238)
1 parent 516ebbd commit 819c982

File tree

3 files changed

+10
-16
lines changed

3 files changed

+10
-16
lines changed

src/DotNetCore.CAP/ICapTransaction.Base.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,7 @@ protected virtual async Task FlushAsync()
5959
var isDelayMessage = message.Origin.Headers.ContainsKey(Headers.DelayTime);
6060
if (isDelayMessage)
6161
{
62-
6362
await _dispatcher.EnqueueToScheduler(message, DateTime.Parse(message.Origin.Headers[Headers.SentTime]!, CultureInfo.InvariantCulture)).ConfigureAwait(false);
64-
6563
}
6664
else
6765
{

src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ internal class CapPublisher : ICapPublisher
2626
private readonly ISnowflakeId _snowflakeId;
2727
private readonly IDispatcher _dispatcher;
2828
private readonly IDataStorage _storage;
29-
private readonly IBootstrapper _bootstrapper;
3029

3130
private readonly AsyncLocal<CapTransactionHolder> _asyncLocal;
3231

3332
public CapPublisher(IServiceProvider service)
3433
{
3534
ServiceProvider = service;
36-
_bootstrapper = service.GetRequiredService<IBootstrapper>();
3735
_dispatcher = service.GetRequiredService<IDispatcher>();
3836
_storage = service.GetRequiredService<IDataStorage>();
3937
_capOptions = service.GetRequiredService<IOptions<CapOptions>>().Value;
@@ -43,8 +41,8 @@ public CapPublisher(IServiceProvider service)
4341

4442
public IServiceProvider ServiceProvider { get; }
4543

46-
public ICapTransaction? Transaction {
47-
44+
public ICapTransaction? Transaction
45+
{
4846
get => _asyncLocal.Value?.Transaction;
4947
set
5048
{
@@ -114,11 +112,6 @@ public void PublishDelay<T>(TimeSpan delayTime, string name, T? value, string? c
114112
private async Task PublishInternalAsync<T>(string name, T? value, IDictionary<string, string?> headers, TimeSpan? delayTime = null,
115113
CancellationToken cancellationToken = default)
116114
{
117-
if (!_bootstrapper.IsStarted)
118-
{
119-
throw new InvalidOperationException("CAP has not been started!");
120-
}
121-
122115
if (string.IsNullOrEmpty(name)) throw new ArgumentNullException(nameof(name));
123116

124117
if (!string.IsNullOrEmpty(_capOptions.TopicNamePrefix)) name = $"{_capOptions.TopicNamePrefix}.{name}";
@@ -176,8 +169,7 @@ private async Task PublishInternalAsync<T>(string name, T? value, IDictionary<st
176169
{
177170
var transaction = (CapTransactionBase)Transaction;
178171

179-
var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction.DbTransaction)
180-
.ConfigureAwait(false);
172+
var mediumMessage = await _storage.StoreMessageAsync(name, message, transaction.DbTransaction).ConfigureAwait(false);
181173

182174
TracingAfter(tracingTimestamp, message);
183175

src/DotNetCore.CAP/Processor/IDispatcher.Default.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ await Task.WhenAll(Enumerable.Range(0, _options.SubscriberParallelExecuteThreadC
112112
}
113113
catch (Exception ex)
114114
{
115-
_logger.LogWarning(ex,
115+
_logger.LogWarning(ex,
116116
"Scheduled message publishing failed unexpectedly, which will stop future scheduled " +
117117
"messages from publishing. See more details here: https://github.com/dotnetcore/CAP/issues/1637. " +
118-
"Exception: {Message}",
118+
"Exception: {Message}",
119119
ex.Message);
120120
throw;
121121
}
@@ -145,7 +145,11 @@ public async ValueTask EnqueueToPublish(MediumMessage message)
145145
{
146146
try
147147
{
148-
if (_tasksCts!.IsCancellationRequested) return;
148+
if (_tasksCts!.IsCancellationRequested)
149+
{
150+
_logger.LogWarning("The message has been persisted, but CAP is currently stopped. It will be attempted to be sent once CAP becomes available.");
151+
return;
152+
}
149153

150154
if (_enableParallelSend && message.Retries == 0)
151155
{

0 commit comments

Comments
 (0)