Skip to content

Commit f38ffdd

Browse files
committed
authmailbox: wait for multisub reconnects in restart test
- Wait for the shared MultiSubscription to report active subscriptions again after the mock server restarts before publishing msg2. - The test already waited for the two direct clients, but not for the multi-subscription, which left a race where the post-restart message could be published before that consumer had reconnected. - Because this subscription uses an empty filter, a missed publish is not recovered from backlog delivery and the test times out on readMultiSub. - Keep the fix test-only so mailbox runtime behavior is unchanged. - This makes TestServerClientAuthAndRestart pass reliably under race runs and matches the failure seen in CI shard 0.
1 parent b0b942d commit f38ffdd

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

authmailbox/client_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/btcsuite/btclog/v2"
12+
"github.com/lightninglabs/taproot-assets/asset"
1213
"github.com/lightninglabs/taproot-assets/fn"
1314
"github.com/lightninglabs/taproot-assets/internal/test"
1415
"github.com/lightninglabs/taproot-assets/proof"
@@ -169,6 +170,32 @@ func TestServerClientAuthAndRestart(t *testing.T) {
169170
t.Cleanup(func() {
170171
require.NoError(t, multiSub.Stop())
171172
})
173+
assertMultiSubConnected := func(targetKeys ...keychain.KeyDescriptor) {
174+
t.Helper()
175+
176+
serverURL := url.URL{Host: clientCfg.ServerAddress}
177+
require.Eventually(t, func() bool {
178+
multiSub.RLock()
179+
defer multiSub.RUnlock()
180+
181+
client, ok := multiSub.clients[serverURL]
182+
if !ok {
183+
return false
184+
}
185+
186+
for _, targetKey := range targetKeys {
187+
key := asset.ToSerialized(targetKey.PubKey)
188+
subscription, ok := client.subscriptions[key]
189+
if !ok || !subscription.IsSubscribed() {
190+
return false
191+
}
192+
}
193+
194+
return true
195+
}, testTimeout, testMinBackoff)
196+
}
197+
assertMultiSubConnected(clientKey1, clientKey2)
198+
172199
msgChan := multiSub.MessageChan()
173200
readMultiSub := func(targetID ...uint64) {
174201
t.Helper()
@@ -221,6 +248,7 @@ func TestServerClientAuthAndRestart(t *testing.T) {
221248
harness.Start(t)
222249
client1.assertConnected(t)
223250
client2.assertConnected(t)
251+
assertMultiSubConnected(clientKey1, clientKey2)
224252

225253
// Let's send another message to all clients.
226254
msg2 := &Message{

authmailbox/receive_subscription.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"math"
99
"sync"
10+
"sync/atomic"
1011
"time"
1112

1213
"github.com/btcsuite/btclog/v2"
@@ -61,6 +62,7 @@ type receiveSubscription struct {
6162
serverStream clientStream
6263
streamMutex sync.RWMutex
6364
streamCancel func()
65+
subscribed atomic.Bool
6466

6567
authOkChan chan struct{}
6668
msgChan chan<- *ReceivedMessages
@@ -198,10 +200,7 @@ func (s *receiveSubscription) wait(backoff time.Duration) error {
198200
// IsSubscribed returns true if at least one account is in an active state and
199201
// the subscription stream to the server was established successfully.
200202
func (s *receiveSubscription) IsSubscribed() bool {
201-
s.streamMutex.RLock()
202-
defer s.streamMutex.RUnlock()
203-
204-
return s.serverStream != nil
203+
return s.subscribed.Load()
205204
}
206205

207206
// connectServerStream opens the initial connection to the server for the stream
@@ -412,6 +411,8 @@ func (s *receiveSubscription) readIncomingStream(ctx context.Context) {
412411
// The server confirms the account subscription. Nothing for us
413412
// to do here.
414413
case *respTypeAuthSuccess:
414+
s.subscribed.Store(true)
415+
415416
// Inform the subscription about the arrived auth
416417
// confirmation.
417418
select {
@@ -476,6 +477,7 @@ func (s *receiveSubscription) closeStream(ctx context.Context) error {
476477

477478
s.streamMutex.Lock()
478479
defer s.streamMutex.Unlock()
480+
s.subscribed.Store(false)
479481

480482
if s.streamCancel != nil {
481483
s.streamCancel()

0 commit comments

Comments
 (0)