Skip to content

Commit 75d5fc1

Browse files
authored
Fix handling for OnNoData when ignoring deletes (#790)
1 parent ebfbf39 commit 75d5fc1

File tree

2 files changed

+100
-0
lines changed

2 files changed

+100
-0
lines changed

src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,17 @@ private async Task CommandLoop()
259259

260260
if (_opts.IgnoreDeletes && operation is NatsKVOperation.Del or NatsKVOperation.Purge)
261261
{
262+
// Check in case all entries are deleted and we want to terminate
263+
// the watcher loop on no-data.
264+
if (msg.Metadata?.NumPending == 0 && _opts.OnNoData != null)
265+
{
266+
if (await _opts.OnNoData(_cancellationToken))
267+
{
268+
_entryChannel.Writer.Complete();
269+
return;
270+
}
271+
}
272+
262273
continue;
263274
}
264275

tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,4 +619,93 @@ await Assert.ThrowsAsync<InvalidOperationException>(async () =>
619619
});
620620
}
621621
}
622+
623+
[Fact]
624+
public async Task ReadAfterDelete()
625+
{
626+
await using var server = await NatsServer.StartJSAsync();
627+
await using var nats = await server.CreateClientConnectionAsync();
628+
629+
var js = new NatsJSContext(nats);
630+
var kv = new NatsKVContext(js);
631+
632+
var store = await kv.CreateStoreAsync("b1");
633+
634+
// Read all entries, should be empty.
635+
List<NatsKVEntry<string>> results = new();
636+
await foreach (var entry in store.WatchAsync<string>(">", opts: new NatsKVWatchOpts
637+
{
638+
OnNoData = (_) => ValueTask.FromResult(true),
639+
}))
640+
{
641+
results.Add(entry);
642+
}
643+
644+
// Should be no results here.
645+
Assert.False(results.Any());
646+
647+
// Add k1
648+
await store.PutAsync("k1", "v1");
649+
650+
// Check if there, should be true
651+
var result1 = await store.TryGetEntryAsync<string>("k1");
652+
Assert.True(result1.Success);
653+
654+
// Remove k1
655+
await store.DeleteAsync("k1");
656+
657+
// Check if there, should be false
658+
var result = await store.TryGetEntryAsync<string>("k1");
659+
Assert.False(result.Success);
660+
661+
// Read all entries.
662+
results.Clear();
663+
await foreach (var entry in store.WatchAsync<string>(">", opts: new NatsKVWatchOpts
664+
{
665+
OnNoData = (_) => ValueTask.FromResult(true),
666+
}))
667+
{
668+
results.Add(entry);
669+
if (entry.Delta == 0)
670+
{
671+
break;
672+
}
673+
}
674+
675+
// Should be 1 entry, which is the deleted OP
676+
Assert.Single(results);
677+
Assert.Equal(NatsKVOperation.Del, results[0].Operation);
678+
679+
// Watch and ignore deletes.... Really OnNoData should execute as we're excluding deletes and there should be no entries coming back, but this just times out.
680+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
681+
var cancellationToken = cts.Token;
682+
683+
results.Clear();
684+
685+
try
686+
{
687+
await foreach (var entry in store.WatchAsync<string>(
688+
">",
689+
opts: new NatsKVWatchOpts
690+
{
691+
IgnoreDeletes = true,
692+
OnNoData = (_) => ValueTask.FromResult(true),
693+
},
694+
cancellationToken: cancellationToken))
695+
{
696+
results.Add(entry);
697+
if (entry.Delta == 0)
698+
{
699+
break;
700+
}
701+
}
702+
}
703+
catch (TaskCanceledException exCancelled)
704+
{
705+
Assert.Fail("Task was cancelled waiting for OnNoData");
706+
}
707+
708+
// Should be no results here.
709+
Assert.False(results.Any());
710+
}
622711
}

0 commit comments

Comments
 (0)