Skip to content

Commit e70fea2

Browse files
committed
xdsclient: delay resource cache deletion to handle immediate re-subscription of same resource
1 parent ec4810c commit e70fea2

File tree

6 files changed

+127
-24
lines changed

6 files changed

+127
-24
lines changed

xds/internal/clients/xdsclient/ads_stream.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,10 @@ type dataAndErrTuple struct {
6868
// occur on the ADS stream. Methods on this interface may be invoked
6969
// concurrently and implementations need to handle them in a thread-safe manner.
7070
type adsStreamEventHandler interface {
71-
onStreamError(error) // Called when the ADS stream breaks.
72-
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
73-
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
71+
onStreamError(error) // Called when the ADS stream breaks.
72+
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
73+
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
74+
onRequiredToRemoveUnsubscribedCacheEntries(typeURL string) // Called when it is needed to remove unsubscribed cache entries.
7475
}
7576

7677
// state corresponding to a resource type.
@@ -444,6 +445,9 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
444445
}
445446
}
446447

448+
// Call the event handler to remove unsubscribed cache entries.
449+
s.eventHandler.onRequiredToRemoveUnsubscribedCacheEntries(url)
450+
447451
msg, err := proto.Marshal(req)
448452
if err != nil {
449453
s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err)
@@ -460,6 +464,7 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
460464
} else if s.logger.V(2) {
461465
s.logger.Warningf("ADS request sent for type %q, resources: %v, version: %q, nonce: %q", url, names, version, nonce)
462466
}
467+
463468
return nil
464469
}
465470

xds/internal/clients/xdsclient/authority.go

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,17 @@ func (a *authority) watchResource(rType ResourceType, resourceName string, watch
655655
}
656656
resources[resourceName] = state
657657
xdsChannel.channel.subscribe(rType, resourceName)
658+
} else if len(state.watchers) == 0 {
659+
if a.logger.V(2) {
660+
a.logger.Infof("Re-watch for type %q, resource name %q before unsubscription", rType.TypeName, resourceName)
661+
}
662+
// Add the active channel to the resource's channel configs if not
663+
// already present.
664+
state.xdsChannelConfigs[xdsChannel] = true
665+
// Ensure the resource is subscribed on the active channel. We do this
666+
// even if resource is present in cache as re-watches might occur
667+
// after unsubscribes or channel changes.
668+
xdsChannel.channel.subscribe(rType, resourceName)
658669
}
659670
// Always add the new watcher to the set of watchers.
660671
state.watchers[watcher] = true
@@ -720,6 +731,10 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat
720731
// there when the watch was registered.
721732
resources := a.resources[rType]
722733
state := resources[resourceName]
734+
if state == nil {
735+
a.logger.Warningf("Attempting to unwatch resource %q of type %q which is not currently watched", resourceName, rType.TypeName)
736+
return
737+
}
723738

724739
// Delete this particular watcher from the list of watchers, so that its
725740
// callback will not be invoked in the future.
@@ -732,32 +747,16 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat
732747
}
733748

734749
// There are no more watchers for this resource. Unsubscribe this
735-
// resource from all channels where it was subscribed to and delete
736-
// the state associated with it.
750+
// resource from all channels where it was subscribed to but do not
751+
// delete the state associated with it in case the resource is
752+
// re-requested later before un-subscription request is completed by
753+
// the management server.
737754
if a.logger.V(2) {
738755
a.logger.Infof("Removing last watch for resource name %q", resourceName)
739756
}
740757
for xcc := range state.xdsChannelConfigs {
741758
xcc.channel.unsubscribe(rType, resourceName)
742759
}
743-
delete(resources, resourceName)
744-
745-
// If there are no more watchers for this resource type, delete the
746-
// resource type from the top-level map.
747-
if len(resources) == 0 {
748-
if a.logger.V(2) {
749-
a.logger.Infof("Removing last watch for resource type %q", rType.TypeName)
750-
}
751-
delete(a.resources, rType)
752-
}
753-
// If there are no more watchers for any resource type, release the
754-
// reference to the xdsChannels.
755-
if len(a.resources) == 0 {
756-
if a.logger.V(2) {
757-
a.logger.Infof("Removing last watch for for any resource type, releasing reference to the xdsChannel")
758-
}
759-
a.closeXDSChannels()
760-
}
761760
}, func() { close(done) })
762761
<-done
763762
})
@@ -874,6 +873,41 @@ func (a *authority) close() {
874873
}
875874
}
876875

876+
// removeUnsubscribedCacheEntries iterates through all resources of the given type and
877+
// removes the state for resources that have no active watchers. This is called
878+
// after sending a discovery request to ensure that resources that were
879+
// unsubscribed (and thus have no watchers) are eventually removed from the
880+
// authority's cache.
881+
func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) {
882+
resources := a.resources[rType]
883+
if resources == nil {
884+
return
885+
}
886+
887+
for name, state := range resources {
888+
if len(state.watchers) == 0 {
889+
if a.logger.V(2) {
890+
a.logger.Infof("Removing resource state for %q of type %q as it has no watchers after an update cycle", name, rType.TypeName)
891+
}
892+
delete(resources, name)
893+
}
894+
}
895+
896+
if len(resources) == 0 {
897+
if a.logger.V(2) {
898+
a.logger.Infof("Removing resource type %q from cache as it has no more resources", rType.TypeName)
899+
}
900+
delete(a.resources, rType)
901+
}
902+
903+
if len(a.resources) == 0 {
904+
if a.logger.V(2) {
905+
a.logger.Infof("Removing last watch for any resource type, releasing reference to the xdsChannels")
906+
}
907+
a.closeXDSChannels()
908+
}
909+
}
910+
877911
func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
878912
switch serviceStatus {
879913
case xdsresource.ServiceStatusUnknown:

xds/internal/clients/xdsclient/channel.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ type xdsChannelEventHandler interface {
5959
// adsResourceDoesNotExist is called when the xdsChannel determines that a
6060
// requested ADS resource does not exist.
6161
adsResourceDoesNotExist(ResourceType, string)
62+
63+
// requiredToRemoveUnsubscribedCacheEntries is called when the xdsChannel
64+
// needs to remove unsubscribed cache entries.
65+
requiredToRemoveUnsubscribedCacheEntries(ResourceType)
6266
}
6367

6468
// xdsChannelOpts holds the options for creating a new xdsChannel.
@@ -136,8 +140,30 @@ type xdsChannel struct {
136140
}
137141

138142
func (xc *xdsChannel) close() {
143+
if xc.closed.HasFired() {
144+
return
145+
}
139146
xc.closed.Fire()
147+
148+
// Get the resource types that this specific ADS stream was handling
149+
// before stopping it.
150+
xc.ads.mu.Lock()
151+
typesHandledByStream := make([]ResourceType, 0, len(xc.ads.resourceTypeState))
152+
for typ := range xc.ads.resourceTypeState {
153+
typesHandledByStream = append(typesHandledByStream, typ)
154+
}
155+
xc.ads.mu.Unlock()
156+
140157
xc.ads.Stop()
158+
159+
// Schedule removeUnsubscribedCacheEntries for the types this stream was handling,
160+
// on all authorities that were interested in this channel.
161+
if _, ok := xc.eventHandler.(*channelState); ok {
162+
for _, typ := range typesHandledByStream {
163+
xc.eventHandler.requiredToRemoveUnsubscribedCacheEntries(typ)
164+
}
165+
}
166+
141167
xc.transport.Close()
142168
xc.logger.Infof("Shutdown")
143169
}
@@ -228,6 +254,24 @@ func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error)
228254
return names, err
229255
}
230256

257+
func (xc *xdsChannel) onRequiredToRemoveUnsubscribedCacheEntries(typeURL string) {
258+
if xc.closed.HasFired() {
259+
if xc.logger.V(2) {
260+
xc.logger.Infof("Received an update from the ADS stream on closed ADS stream")
261+
}
262+
return
263+
}
264+
265+
// Lookup the resource parser based on the resource type.
266+
rType, ok := xc.clientConfig.ResourceTypes[typeURL]
267+
if !ok {
268+
logger.Warningf("Resource type URL %q unknown in response from server", typeURL)
269+
return
270+
}
271+
272+
xc.eventHandler.requiredToRemoveUnsubscribedCacheEntries(rType)
273+
}
274+
231275
// decodeResponse decodes the resources in the given ADS response.
232276
//
233277
// The opts parameter provides configuration options for decoding the resources.

xds/internal/clients/xdsclient/channel_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,3 +772,6 @@ func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (Re
772772
}
773773
return typ, name, nil
774774
}
775+
776+
func (*testEventHandler) requiredToRemoveUnsubscribedCacheEntries(ResourceType) {
777+
}

xds/internal/clients/xdsclient/test/ads_stream_ack_nack_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,10 @@ func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) {
469469
}
470470

471471
// Cancel the watch on the listener resource. This should result in the
472-
// existing connection to be management server getting closed.
472+
// existing connection to be management server getting closed after the
473+
// unsubscription discovery request is sent.
473474
ldsCancel()
475+
// Verify that the connection to the management server is closed.
474476
if _, err := streamCloseCh.Receive(ctx); err != nil {
475477
t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
476478
}

xds/internal/clients/xdsclient/xdsclient.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,21 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s
439439
}
440440
}
441441

442+
func (cs *channelState) requiredToRemoveUnsubscribedCacheEntries(rType ResourceType) {
443+
if cs.parent.done.HasFired() {
444+
return
445+
}
446+
447+
cs.parent.channelsMu.Lock()
448+
defer cs.parent.channelsMu.Unlock()
449+
450+
for authority := range cs.interestedAuthorities {
451+
authority.xdsClientSerializer.TrySchedule(func(context.Context) {
452+
authority.removeUnsubscribedCacheEntries(rType)
453+
})
454+
}
455+
}
456+
442457
func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) {
443458
c.channelsMu.Lock()
444459
defer c.channelsMu.Unlock()

0 commit comments

Comments
 (0)