Skip to content

Commit 3ad2b0a

Browse files
ramaraochavaliistio-testing
authored andcommitted
service entry: optimize service entry updates (#19305)
* optimize service entry updates Signed-off-by: Rama Chavali <[email protected]> * move to xds updater and kill handlers for service entries Signed-off-by: Rama Chavali <[email protected]> * fix ut and dependency ordering Signed-off-by: Rama Chavali <[email protected]> * lint fixes Signed-off-by: Rama Chavali <[email protected]> * address minor nits Signed-off-by: Rama Chavali <[email protected]>
1 parent e232fb3 commit 3ad2b0a

28 files changed

+589
-164
lines changed

pilot/pkg/bootstrap/server.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import (
5252
"istio.io/istio/pilot/pkg/networking/plugin"
5353
envoyv2 "istio.io/istio/pilot/pkg/proxy/envoy/v2"
5454
"istio.io/istio/pilot/pkg/serviceregistry/aggregate"
55+
"istio.io/istio/pilot/pkg/serviceregistry/external"
5556
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
5657
"istio.io/istio/pkg/config/constants"
5758
"istio.io/istio/pkg/config/schemas"
@@ -126,6 +127,7 @@ type Server struct {
126127
incrementalMcpOptions *coredatamodel.Options
127128
mcpOptions *coredatamodel.Options
128129
certController *chiron.WebhookController
130+
serviceEntryStore *external.ServiceEntryStore
129131
}
130132

131133
var podNamespaceVar = env.RegisterStringVar("POD_NAMESPACE", "", "")
@@ -248,6 +250,12 @@ func (s *Server) initDiscoveryService(args *PilotArgs) error {
248250
s.kubeRegistry.XDSUpdater = s.EnvoyXdsServer
249251
}
250252

253+
// TODO: Split initDiscoveryService method in to createDiscoveryServer and initDiscoveryService so that, this special
254+
// handling is not needed. Because of dependency ordering problem, we need to set this explicitly here.
255+
if s.serviceEntryStore != nil {
256+
s.serviceEntryStore.XdsUpdater = s.EnvoyXdsServer
257+
}
258+
251259
if s.mcpOptions != nil {
252260
s.mcpOptions.XDSUpdater = s.EnvoyXdsServer
253261
}
@@ -502,7 +510,7 @@ func (s *Server) initEventHandlers() error {
502510
}
503511

504512
instanceHandler := func(si *model.ServiceInstance, _ model.Event) {
505-
// TODO: This is an incomplete code. This code path is called for service entries, consul, etc.
513+
// TODO: This is an incomplete code. This code path is called for consul, etc.
506514
// In all cases, this is simply an instance update and not a config update. So, we need to update
507515
// EDS in all proxies, and do a full config push for the instance that just changed (add/update only).
508516
s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{
@@ -520,10 +528,10 @@ func (s *Server) initEventHandlers() error {
520528
if s.configController != nil {
521529
// TODO: changes should not trigger a full recompute of LDS/RDS/CDS/EDS
522530
// (especially mixerclient HTTP and quota)
523-
configHandler := func(c model.Config, _ model.Event) {
531+
configHandler := func(old, curr model.Config, _ model.Event) {
524532
pushReq := &model.PushRequest{
525533
Full: true,
526-
ConfigTypesUpdated: map[string]struct{}{c.Type: {}},
534+
ConfigTypesUpdated: map[string]struct{}{curr.Type: {}},
527535
}
528536
s.EnvoyXdsServer.ConfigUpdate(pushReq)
529537
}

pilot/pkg/bootstrap/servicecontroller.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ func (s *Server) initServiceControllers(args *PilotArgs) error {
6161
}
6262
}
6363

64-
serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore)
64+
serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore, s.EnvoyXdsServer)
6565
serviceControllers.AddRegistry(serviceEntryStore)
6666

6767
s.ServiceController = serviceControllers
68+
s.serviceEntryStore = serviceEntryStore
6869

6970
// Defer running of the service controllers.
7071
s.addStartFunc(func(stop <-chan struct{}) error {

pilot/pkg/config/aggregate/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (cr *storeCache) HasSynced() bool {
171171
return true
172172
}
173173

174-
func (cr *storeCache) RegisterEventHandler(typ string, handler func(model.Config, model.Event)) {
174+
func (cr *storeCache) RegisterEventHandler(typ string, handler func(model.Config, model.Config, model.Event)) {
175175
for _, cache := range cr.caches {
176176
if _, exists := cache.ConfigDescriptor().GetByType(typ); exists {
177177
cache.RegisterEventHandler(typ, handler)

pilot/pkg/config/aggregate/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func TestAggregateStoreCache(t *testing.T) {
242242
t.Run("it registers an event handler", func(t *testing.T) {
243243
g := gomega.NewGomegaWithT(t)
244244

245-
cacheStore.RegisterEventHandler("some-config", func(model.Config, model.Event) {})
245+
cacheStore.RegisterEventHandler("some-config", func(model.Config, model.Config, model.Event) {})
246246

247247
typ, h := storeOne.RegisterEventHandlerArgsForCall(0)
248248
g.Expect(typ).To(gomega.Equal("some-config"))

pilot/pkg/config/aggregate/fakes/config_store_cache.gen.go

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pilot/pkg/config/coredatamodel/controller.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type Controller struct {
6363
configStore map[string]map[string]map[string]*model.Config
6464
descriptorsByCollection map[string]schema.Instance
6565
options *Options
66-
eventHandlers map[string][]func(model.Config, model.Event)
66+
eventHandlers map[string][]func(model.Config, model.Config, model.Event)
6767
ledger ledger.Ledger
6868
supportedConfig schema.Set
6969

@@ -93,7 +93,7 @@ func NewController(options *Options) CoreDataModel {
9393
configStore: make(map[string]map[string]map[string]*model.Config),
9494
options: options,
9595
descriptorsByCollection: descriptorsByMessageName,
96-
eventHandlers: make(map[string][]func(model.Config, model.Event)),
96+
eventHandlers: make(map[string][]func(model.Config, model.Config, model.Event)),
9797
synced: synced,
9898
ledger: options.ConfigLedger,
9999
supportedConfig: supportedCfg,
@@ -247,7 +247,7 @@ func (c *Controller) HasSynced() bool {
247247
}
248248

249249
// RegisterEventHandler registers a handler using the type as a key
250-
func (c *Controller) RegisterEventHandler(typ string, handler func(model.Config, model.Event)) {
250+
func (c *Controller) RegisterEventHandler(typ string, handler func(model.Config, model.Config, model.Event)) {
251251
c.eventHandlers[typ] = append(c.eventHandlers[typ], handler)
252252
}
253253

@@ -296,10 +296,10 @@ func (c *Controller) sync(collection string) {
296296
func (c *Controller) serviceEntryEvents(currentStore, prevStore map[string]map[string]*model.Config) {
297297
dispatch := func(model model.Config, event model.Event) {}
298298
if handlers, ok := c.eventHandlers[schemas.ServiceEntry.Type]; ok {
299-
dispatch = func(model model.Config, event model.Event) {
300-
log.Debugf("MCP event dispatch: key=%v event=%v", model.Key(), event.String())
299+
dispatch = func(config model.Config, event model.Event) {
300+
log.Debugf("MCP event dispatch: key=%v event=%v", config.Key(), event.String())
301301
for _, handler := range handlers {
302-
handler(model, event)
302+
handler(model.Config{}, config, event)
303303
}
304304
}
305305
}

pilot/pkg/config/coredatamodel/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ func TestEventHandler(t *testing.T) {
639639
model.EventUpdate: {},
640640
model.EventDelete: {},
641641
}
642-
controller.RegisterEventHandler(schemas.ServiceEntry.Type, func(m model.Config, e model.Event) {
642+
controller.RegisterEventHandler(schemas.ServiceEntry.Type, func(_, m model.Config, e model.Event) {
643643
gotEvents[e][makeName(m.Namespace, m.Name)] = m
644644
})
645645

pilot/pkg/config/coredatamodel/discovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func NewMCPDiscovery(controller CoreDataModel, options *DiscoveryOptions) *MCPDi
6666
cacheByHostName: make(map[host.Name][]*model.ServiceInstance),
6767
cacheServices: make(map[string]*model.Service),
6868
}
69-
discovery.RegisterEventHandler(schemas.SyntheticServiceEntry.Type, func(config model.Config, event model.Event) {
69+
discovery.RegisterEventHandler(schemas.SyntheticServiceEntry.Type, func(_, config model.Config, event model.Event) {
7070
discovery.HandleCacheEvents(config, event)
7171
})
7272
return discovery

pilot/pkg/config/coredatamodel/syntheticserviceentrycontroller.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type SyntheticServiceEntryController struct {
4545
configStoreMu sync.RWMutex
4646
// keys [namespace][name]
4747
configStore map[string]map[string]*model.Config
48-
eventHandler func(model.Config, model.Event)
48+
eventHandler func(model.Config, model.Config, model.Event)
4949
synced uint32
5050
*Options
5151
}
@@ -126,16 +126,15 @@ func (c *SyntheticServiceEntryController) HasSynced() bool {
126126

127127
func (c *SyntheticServiceEntryController) dispatch(config model.Config, event model.Event) {
128128
if c.eventHandler != nil {
129-
c.eventHandler(config, event)
129+
c.eventHandler(model.Config{}, config, event)
130130
}
131131
}
132132

133133
// RegisterEventHandler registers a handler using the type as a key
134-
func (c *SyntheticServiceEntryController) RegisterEventHandler(typ string, handler func(model.Config, model.Event)) {
134+
func (c *SyntheticServiceEntryController) RegisterEventHandler(typ string, handler func(model.Config, model.Config, model.Event)) {
135135
// TODO: investigate why it is called more than one
136136
if c.eventHandler == nil {
137137
c.eventHandler = handler
138-
139138
}
140139
}
141140

pilot/pkg/config/kube/crd/controller/controller.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,11 @@ func (c *controller) addInformer(schema schema.Instance, namespace string, resyn
162162

163163
// notify is the first handler in the handler chain.
164164
// Returning an error causes repeated execution of the entire chain.
165-
func (c *controller) notify(obj interface{}, event model.Event) error {
165+
func (c *controller) notify(old, curr interface{}, event model.Event) error {
166166
if !c.HasSynced() {
167167
return errors.New("waiting till full synchronization")
168168
}
169-
_, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
169+
_, err := cache.DeletionHandlingMetaNamespaceKeyFunc(curr)
170170
if err != nil {
171171
log.Infof("Error retrieving key: %v", err)
172172
}
@@ -197,7 +197,7 @@ func (c *controller) createInformer(
197197
return
198198
}
199199
incrementEvent(otype, "add")
200-
c.queue.Push(kube.NewTask(handler.Apply, obj, model.EventAdd))
200+
c.queue.Push(kube.NewTask(handler.Apply, nil, obj, model.EventAdd))
201201
},
202202
UpdateFunc: func(old, cur interface{}) {
203203
if err := vf(cur); err != nil {
@@ -206,14 +206,14 @@ func (c *controller) createInformer(
206206
}
207207
if !reflect.DeepEqual(old, cur) {
208208
incrementEvent(otype, "update")
209-
c.queue.Push(kube.NewTask(handler.Apply, cur, model.EventUpdate))
209+
c.queue.Push(kube.NewTask(handler.Apply, old, cur, model.EventUpdate))
210210
} else {
211211
incrementEvent(otype, "updatesame")
212212
}
213213
},
214214
DeleteFunc: func(obj interface{}) {
215215
incrementEvent(otype, "delete")
216-
c.queue.Push(kube.NewTask(handler.Apply, obj, model.EventDelete))
216+
c.queue.Push(kube.NewTask(handler.Apply, nil, obj, model.EventDelete))
217217
},
218218
})
219219

@@ -236,19 +236,27 @@ func incrementEvent(kind, event string) {
236236
k8sEvents.With(typeTag.Value(kind), eventTag.Value(event)).Increment()
237237
}
238238

239-
func (c *controller) RegisterEventHandler(typ string, f func(model.Config, model.Event)) {
239+
func (c *controller) RegisterEventHandler(typ string, f func(model.Config, model.Config, model.Event)) {
240240
s, exists := c.ConfigDescriptor().GetByType(typ)
241241
if !exists {
242242
return
243243
}
244-
c.kinds[typ].handler.Append(func(object interface{}, ev model.Event) error {
245-
item, ok := object.(crd.IstioObject)
244+
c.kinds[typ].handler.Append(func(old, curr interface{}, ev model.Event) error {
245+
curritem, ok := curr.(crd.IstioObject)
246246
if ok {
247-
config, err := crd.ConvertObject(s, item, c.client.domainSuffix)
247+
config, err := crd.ConvertObject(s, curritem, c.client.domainSuffix)
248248
if err != nil {
249-
log.Warnf("error translating object for schema %#v : %v\n Object:\n%#v", s, err, object)
249+
log.Warnf("error translating object for schema %#v : %v\n Object:\n%#v", s, err, curr)
250250
} else {
251-
f(*config, ev)
251+
olditem, ok := old.(crd.IstioObject)
252+
oldconfig := &model.Config{}
253+
if ok {
254+
oldconfig, err = crd.ConvertObject(s, olditem, c.client.domainSuffix)
255+
if err != nil {
256+
log.Warnf("error translating object for schema %#v : %v\n Object:\n%#v", s, err, old)
257+
}
258+
}
259+
f(*oldconfig, *config, ev)
252260
}
253261
}
254262
return nil

pilot/pkg/config/kube/ingress/controller.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,21 +100,21 @@ func NewController(client kubernetes.Interface, mesh *meshconfig.MeshConfig,
100100
informer.AddEventHandler(
101101
cache.ResourceEventHandlerFuncs{
102102
AddFunc: func(obj interface{}) {
103-
queue.Push(kube.NewTask(handler.Apply, obj, model.EventAdd))
103+
queue.Push(kube.NewTask(handler.Apply, nil, obj, model.EventAdd))
104104
},
105105
UpdateFunc: func(old, cur interface{}) {
106106
if !reflect.DeepEqual(old, cur) {
107-
queue.Push(kube.NewTask(handler.Apply, cur, model.EventUpdate))
107+
queue.Push(kube.NewTask(handler.Apply, old, cur, model.EventUpdate))
108108
}
109109
},
110110
DeleteFunc: func(obj interface{}) {
111-
queue.Push(kube.NewTask(handler.Apply, obj, model.EventDelete))
111+
queue.Push(kube.NewTask(handler.Apply, nil, obj, model.EventDelete))
112112
},
113113
})
114114

115115
// first handler in the chain blocks until the cache is fully synchronized
116116
// it does this by returning an error to the chain handler
117-
handler.Append(func(obj interface{}, event model.Event) error {
117+
handler.Append(func(_, obj interface{}, event model.Event) error {
118118
if !informer.HasSynced() {
119119
return errors.New("waiting till full synchronization")
120120
}
@@ -134,9 +134,9 @@ func NewController(client kubernetes.Interface, mesh *meshconfig.MeshConfig,
134134
}
135135
}
136136

137-
func (c *controller) RegisterEventHandler(typ string, f func(model.Config, model.Event)) {
138-
c.handler.Append(func(obj interface{}, event model.Event) error {
139-
ingress, ok := obj.(*extensionsv1beta1.Ingress)
137+
func (c *controller) RegisterEventHandler(typ string, f func(model.Config, model.Config, model.Event)) {
138+
c.handler.Append(func(_, curr interface{}, event model.Event) error {
139+
ingress, ok := curr.(*extensionsv1beta1.Ingress)
140140
if !ok || !shouldProcessIngress(c.mesh, ingress) {
141141
return nil
142142
}
@@ -151,7 +151,7 @@ func (c *controller) RegisterEventHandler(typ string, f func(model.Config, model
151151
// An updated ingress may also trigger an Add or Delete for one of its constituent sub-rules.
152152
switch typ {
153153
case schemas.VirtualService.Type:
154-
f(model.Config{
154+
f(model.Config{}, model.Config{
155155
ConfigMeta: model.ConfigMeta{
156156
Type: typ,
157157
},

pilot/pkg/config/kube/ingress/status.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func NewStatusSyncer(mesh *meshconfig.MeshConfig,
121121
log.Infof("I am the new status update leader")
122122
go st.queue.Run(ctx.Done())
123123
err := wait.PollUntil(updateInterval, func() (bool, error) {
124-
st.queue.Push(kube.NewTask(st.handler.Apply, "Start leading", model.EventUpdate))
124+
st.queue.Push(kube.NewTask(st.handler.Apply, "", "Start leading", model.EventUpdate))
125125
return false, nil
126126
}, ctx.Done())
127127

@@ -171,7 +171,7 @@ func NewStatusSyncer(mesh *meshconfig.MeshConfig,
171171
st.elector = le
172172

173173
// Register handler at the beginning
174-
handler.Append(func(obj interface{}, event model.Event) error {
174+
handler.Append(func(old, curr interface{}, event model.Event) error {
175175
addrs, err := st.runningAddresses(ingressNamespace)
176176
if err != nil {
177177
return err

pilot/pkg/config/memory/controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func NewController(cs model.ConfigStore) model.ConfigStoreCache {
3737
return out
3838
}
3939

40-
func (c *controller) RegisterEventHandler(typ string, f func(model.Config, model.Event)) {
40+
func (c *controller) RegisterEventHandler(typ string, f func(model.Config, model.Config, model.Event)) {
4141
c.monitor.AppendEventHandler(typ, f)
4242
}
4343

@@ -77,8 +77,10 @@ func (c *controller) Create(config model.Config) (revision string, err error) {
7777
}
7878

7979
func (c *controller) Update(config model.Config) (newRevision string, err error) {
80+
oldconfig := c.configStore.Get(config.Type, config.Name, config.Namespace)
8081
if newRevision, err = c.configStore.Update(config); err == nil {
8182
c.monitor.ScheduleProcessEvent(ConfigEvent{
83+
old: *oldconfig,
8284
config: config,
8385
event: model.EventUpdate,
8486
})

0 commit comments

Comments
 (0)