@@ -527,3 +527,139 @@ func (s) TestWatchErrorsContainNodeID_ChannelCreationFailure(t *testing.T) {
527
527
}
528
528
}
529
529
}
530
+
531
+ // TestUnsubscribeAndResubscribe tests the scenario where the client is busy
532
+ // processing a response (simulating a pending ACK at a higher level by holding
533
+ // the onDone callback from watchers). During this busy state, a resource is
534
+ // unsubscribed and then immediately resubscribed which causes the
535
+ // unsubscription and new subscription requests to be buffered due to flow
536
+ // control.
537
+ //
538
+ // The test verifies the following:
539
+ // - The resubscribed resource is served from the cache.
540
+ // - No "resource does not exist" error is generated for the resubscribed
541
+ // resource.
542
+ func (s ) TestRaceUnsubscribeResubscribe (t * testing.T ) {
543
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
544
+ defer cancel ()
545
+
546
+ mgmtServer := e2e .StartManagementServer (t , e2e.ManagementServerOptions {})
547
+ nodeID := uuid .New ().String ()
548
+
549
+ resourceTypes := map [string ]xdsclient.ResourceType {xdsresource .V3ListenerURL : listenerType }
550
+ si := clients.ServerIdentifier {
551
+ ServerURI : mgmtServer .Address ,
552
+ Extensions : grpctransport.ServerIdentifierExtension {ConfigName : "insecure" },
553
+ }
554
+
555
+ configs := map [string ]grpctransport.Config {"insecure" : {Credentials : insecure .NewBundle ()}}
556
+ xdsClientConfig := xdsclient.Config {
557
+ Servers : []xdsclient.ServerConfig {{ServerIdentifier : si }},
558
+ Node : clients.Node {ID : nodeID },
559
+ TransportBuilder : grpctransport .NewBuilder (configs ),
560
+ ResourceTypes : resourceTypes ,
561
+ // Xdstp resource names used in this test do not specify an
562
+ // authority. These will end up looking up an entry with the
563
+ // empty key in the authorities map. Having an entry with an
564
+ // empty key and empty configuration, results in these
565
+ // resources also using the top-level configuration.
566
+ Authorities : map [string ]xdsclient.Authority {
567
+ "" : {XDSServers : []xdsclient.ServerConfig {}},
568
+ },
569
+ }
570
+
571
+ // Create an xDS client with the above config.
572
+ client , err := xdsclient .New (xdsClientConfig )
573
+ if err != nil {
574
+ t .Fatalf ("Failed to create xDS client: %v" , err )
575
+ }
576
+ defer client .Close ()
577
+
578
+ ldsResourceName1 := "test-listener-resource1"
579
+ ldsResourceName2 := "test-route-configuration-resource1"
580
+ rdsName1 := "test-listener-resource2"
581
+ rdsName2 := "test-route-configuration-resource2"
582
+ listenerResource1 := e2e .DefaultClientListener (ldsResourceName1 , rdsName1 )
583
+ listenerResource2 := e2e .DefaultClientListener (ldsResourceName2 , rdsName2 )
584
+
585
+ // Watch ldsResourceName1 with a regular watcher to ensure it's in cache
586
+ // and ACKed.
587
+ watcherInitial := newListenerWatcher ()
588
+ cancelInitial := client .WatchResource (xdsresource .V3ListenerURL , ldsResourceName1 , watcherInitial )
589
+ if err := mgmtServer .Update (ctx , e2e.UpdateOptions {NodeID : nodeID , Listeners : []* v3listenerpb.Listener {listenerResource1 }, SkipValidation : true }); err != nil {
590
+ t .Fatalf ("mgmtServer.Update() for %s failed: %v" , ldsResourceName1 , err )
591
+ }
592
+ if err := verifyListenerUpdate (ctx , watcherInitial .updateCh , listenerUpdateErrTuple {update : listenerUpdate {RouteConfigName : rdsName1 }}); err != nil {
593
+ t .Fatalf ("watcherR1Initial did not receive update for %s: %v" , ldsResourceName1 , err )
594
+ }
595
+ cancelInitial ()
596
+
597
+ // Watch ldsResourceName1 and ldsResourceName2 using blocking watchers.
598
+ // - Server sends {ldsResourceName1, ldsResourceName2}.
599
+ // - Watchers for both resources get the update but we HOLD on to their
600
+ // onDone callbacks.
601
+ blockingWatcherR1 := newBLockingListenerWatcher ()
602
+ cancelR1 := client .WatchResource (xdsresource .V3ListenerURL , ldsResourceName1 , blockingWatcherR1 )
603
+ // defer cancelR1 later to create the race
604
+
605
+ blockingWatcherR2 := newBLockingListenerWatcher ()
606
+ cancelR2 := client .WatchResource (xdsresource .V3ListenerURL , ldsResourceName2 , blockingWatcherR2 )
607
+ defer cancelR2 ()
608
+
609
+ // Configure the listener resources on the management server.
610
+ resources := e2e.UpdateOptions {
611
+ NodeID : nodeID ,
612
+ Listeners : []* v3listenerpb.Listener {listenerResource1 , listenerResource2 },
613
+ SkipValidation : true }
614
+ if err := mgmtServer .Update (ctx , resources ); err != nil {
615
+ t .Fatalf ("mgmtServer.Update() for %s and %s failed: %v" , ldsResourceName1 , ldsResourceName2 , err )
616
+ }
617
+
618
+ var onDoneR1 , onDoneR2 func ()
619
+ select {
620
+ case <- blockingWatcherR1 .updateCh :
621
+ onDoneR1 = <- blockingWatcherR1 .doneNotifierCh
622
+ case <- ctx .Done ():
623
+ t .Fatalf ("Timeout waiting for update for %s on blockingWatcherR1: %v" , ldsResourceName1 , ctx .Err ())
624
+ }
625
+ select {
626
+ case <- blockingWatcherR2 .updateCh :
627
+ onDoneR2 = <- blockingWatcherR2 .doneNotifierCh
628
+ case <- ctx .Done ():
629
+ t .Fatalf ("Timeout waiting for update for %s on blockingWatcherR2: %v" , ldsResourceName2 , ctx .Err ())
630
+ }
631
+
632
+ // At this point, ACK for {listenerResource1,listenerResource2} has been
633
+ // sent by the client but s.fc.pending.Load() is true because onDoneR1 and
634
+ // onDoneR2 are held.
635
+ //
636
+ // Unsubscribe listenerResource1. This request should be buffered by
637
+ // adsStreamImpl because s.fc.pending.Load() is true.
638
+ cancelR1 ()
639
+
640
+ // Resubscribe listenerResource1 with a new regular watcher, which should
641
+ // be served from cache.
642
+ watcherR1New := newListenerWatcher ()
643
+ cancelR1New := client .WatchResource (xdsresource .V3ListenerURL , ldsResourceName1 , watcherR1New )
644
+ defer cancelR1New ()
645
+
646
+ if err := verifyListenerUpdate (ctx , watcherR1New .updateCh , listenerUpdateErrTuple {update : listenerUpdate {RouteConfigName : rdsName1 }}); err != nil {
647
+ t .Fatalf ("watcherR1New did not receive cached update for %s: %v" , ldsResourceName1 , err )
648
+ }
649
+
650
+ // Release the onDone callbacks.
651
+ if onDoneR1 != nil { // onDoneR1 might be nil if cancelR1() completed very fast.
652
+ onDoneR1 ()
653
+ }
654
+ onDoneR2 ()
655
+
656
+ // Verify watcherR1New does not get a "resource does not exist" error.
657
+ sCtx , sCancel := context .WithTimeout (ctx , defaultTestShortTimeout * 10 ) // Slightly longer to catch delayed errors
658
+ defer sCancel ()
659
+ if err := verifyNoListenerUpdate (sCtx , watcherR1New .resourceErrCh ); err != nil {
660
+ t .Fatalf ("watcherR1New received unexpected resource error for %s: %v" , ldsResourceName1 , err )
661
+ }
662
+ if err := verifyNoListenerUpdate (sCtx , watcherR1New .ambientErrCh ); err != nil {
663
+ t .Fatalf ("watcherR1New received unexpected ambient error for %s: %v" , ldsResourceName1 , err )
664
+ }
665
+ }
0 commit comments