Skip to content

Commit 2aa121e

Browse files
committed
apiexport: watch APIExportEndpointSllice dynamically
On-behalf-of: SAP <[email protected]> Signed-off-by: Marvin Beckers <[email protected]>
1 parent 19eb719 commit 2aa121e

File tree

3 files changed

+429
-184
lines changed

3 files changed

+429
-184
lines changed
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package provider
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
"time"
24+
25+
"github.com/go-logr/logr"
26+
"golang.org/x/sync/errgroup"
27+
28+
"k8s.io/apimachinery/pkg/runtime"
29+
"k8s.io/client-go/kubernetes/scheme"
30+
"k8s.io/client-go/rest"
31+
toolscache "k8s.io/client-go/tools/cache"
32+
"k8s.io/client-go/tools/record"
33+
"k8s.io/klog/v2"
34+
"sigs.k8s.io/controller-runtime/pkg/cache"
35+
"sigs.k8s.io/controller-runtime/pkg/client"
36+
"sigs.k8s.io/controller-runtime/pkg/cluster"
37+
"sigs.k8s.io/controller-runtime/pkg/log"
38+
39+
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
40+
41+
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
42+
apisv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
43+
"github.com/kcp-dev/logicalcluster/v3"
44+
45+
mcpcache "github.com/kcp-dev/multicluster-provider/internal/cache"
46+
mcrecorder "github.com/kcp-dev/multicluster-provider/internal/events/recorder"
47+
)
48+
49+
var _ multicluster.Provider = &Provider{}
50+
var _ multicluster.ProviderRunnable = &Provider{}
51+
52+
type Provider struct {
53+
config *rest.Config
54+
scheme *runtime.Scheme
55+
cache mcpcache.WildcardCache
56+
object client.Object
57+
58+
log logr.Logger
59+
60+
lock sync.RWMutex
61+
aware multicluster.Aware
62+
clusters map[logicalcluster.Name]cluster.Cluster
63+
cancelFns map[logicalcluster.Name]context.CancelFunc
64+
65+
recorderProvider *mcrecorder.Provider
66+
}
67+
68+
// Options are the options for creating a new instance of the apiexport provider.
69+
type Options struct {
70+
// Scheme is the scheme to use for the provider. If this is nil, it defaults
71+
// to the client-go scheme.
72+
Scheme *runtime.Scheme
73+
74+
// WildcardCache is the wildcard cache to use for the provider. If this is
75+
// nil, a new wildcard cache will be created for the given rest.Config.
76+
WildcardCache mcpcache.WildcardCache
77+
78+
// ObjectToWatch is the object type that the provider watches via a /clusters/*
79+
// wildcard endpoint to extract information about logical clusters joining and
80+
// leaving the "fleet" of (logical) clusters in kcp. If this is nil, it defaults
81+
// to [apisv1alpha1.APIBinding]. This might be useful when using this provider
82+
// against custom virtual workspaces that are not the APIExport one but share
83+
// the same endpoint semantics.
84+
ObjectToWatch client.Object
85+
86+
// Log is the logger used to write any logs.
87+
Log *logr.Logger
88+
89+
// makeBroadcaster allows deferring the creation of the broadcaster to
90+
// avoid leaking goroutines if we never call Start on this manager. It also
91+
// returns whether or not this is an "owned" broadcaster, and as such should be
92+
// stopped with the manager.
93+
makeBroadcaster mcrecorder.EventBroadcasterProducer
94+
}
95+
96+
// New creates a new kcp virtual workspace provider. The provided [rest.Config]
97+
// must point to a virtual workspace apiserver base path, i.e. up to but without
98+
// the '/clusters/*' suffix. This information can be extracted from the APIExport
99+
// status (deprecated) or an APIExportEndpointSlice status.
100+
func New(cfg *rest.Config, options Options) (*Provider, error) {
101+
// Do the defaulting controller-runtime would do for those fields we need.
102+
if options.Scheme == nil {
103+
options.Scheme = scheme.Scheme
104+
}
105+
if options.WildcardCache == nil {
106+
var err error
107+
options.WildcardCache, err = mcpcache.NewWildcardCache(cfg, cache.Options{
108+
Scheme: options.Scheme,
109+
})
110+
if err != nil {
111+
return nil, fmt.Errorf("failed to create wildcard cache: %w", err)
112+
}
113+
}
114+
if options.ObjectToWatch == nil {
115+
options.ObjectToWatch = &apisv1alpha1.APIBinding{}
116+
}
117+
118+
if options.makeBroadcaster == nil {
119+
options.makeBroadcaster = func() (record.EventBroadcaster, bool) {
120+
return record.NewBroadcaster(), true
121+
}
122+
}
123+
if options.Log == nil {
124+
logger := log.Log.WithName("kcp-apiexport-internal-provider")
125+
options.Log = &logger
126+
}
127+
128+
eventClient, err := rest.HTTPClientFor(cfg)
129+
if err != nil {
130+
return nil, fmt.Errorf("failed to create HTTP client for events: %w", err)
131+
}
132+
133+
recorderProvider, err := mcrecorder.NewProvider(eventClient, options.Scheme, logr.Discard(), options.makeBroadcaster)
134+
if err != nil {
135+
return nil, err
136+
}
137+
138+
return &Provider{
139+
config: cfg,
140+
scheme: options.Scheme,
141+
cache: options.WildcardCache,
142+
object: options.ObjectToWatch,
143+
144+
log: *options.Log,
145+
146+
clusters: map[logicalcluster.Name]cluster.Cluster{},
147+
cancelFns: map[logicalcluster.Name]context.CancelFunc{},
148+
149+
recorderProvider: recorderProvider,
150+
}, nil
151+
}
152+
153+
// Start starts the provider and blocks.
154+
func (p *Provider) Start(ctx context.Context, aware multicluster.Aware) error {
155+
g, ctx := errgroup.WithContext(ctx)
156+
p.aware = aware
157+
158+
// Watch logical clusters and engage them as clusters in multicluster-runtime.
159+
inf, err := p.cache.GetInformer(ctx, p.object, cache.BlockUntilSynced(false))
160+
if err != nil {
161+
return fmt.Errorf("failed to get %T informer: %w", p.object, err)
162+
}
163+
shInf, _, _, err := p.cache.GetSharedInformer(p.object)
164+
if err != nil {
165+
return fmt.Errorf("failed to get shared informer: %w", err)
166+
}
167+
if _, err := inf.AddEventHandler(toolscache.ResourceEventHandlerFuncs{
168+
AddFunc: func(obj any) {
169+
cobj, ok := obj.(client.Object)
170+
if !ok {
171+
klog.Errorf("unexpected object type %T", obj)
172+
return
173+
}
174+
clusterName := logicalcluster.From(cobj)
175+
176+
// fast path: cluster exists already, there is nothing to do.
177+
p.lock.RLock()
178+
if _, ok := p.clusters[clusterName]; ok {
179+
p.lock.RUnlock()
180+
return
181+
}
182+
p.lock.RUnlock()
183+
184+
// slow path: take write lock to add a new cluster (unless it appeared in the meantime).
185+
p.lock.Lock()
186+
if _, ok := p.clusters[clusterName]; ok {
187+
p.lock.Unlock()
188+
return
189+
}
190+
191+
// create new scoped cluster.
192+
clusterCtx, cancel := context.WithCancel(ctx)
193+
cl, err := mcpcache.NewScopedCluster(p.config, clusterName, p.cache, p.scheme, p.recorderProvider)
194+
if err != nil {
195+
p.log.Error(err, "failed to create cluster", "cluster", clusterName)
196+
cancel()
197+
p.lock.Unlock()
198+
return
199+
}
200+
p.clusters[clusterName] = cl
201+
p.cancelFns[clusterName] = cancel
202+
p.lock.Unlock()
203+
204+
p.log.Info("engaging cluster", "cluster", clusterName)
205+
if err := p.aware.Engage(clusterCtx, clusterName.String(), cl); err != nil {
206+
p.log.Error(err, "failed to engage cluster", "cluster", clusterName)
207+
p.lock.Lock()
208+
cancel()
209+
if p.clusters[clusterName] == cl {
210+
delete(p.clusters, clusterName)
211+
delete(p.cancelFns, clusterName)
212+
}
213+
p.lock.Unlock()
214+
}
215+
},
216+
DeleteFunc: func(obj any) {
217+
cobj, ok := obj.(client.Object)
218+
if !ok {
219+
tombstone, ok := obj.(toolscache.DeletedFinalStateUnknown)
220+
if !ok {
221+
klog.Errorf("Couldn't get object from tombstone %#v", obj)
222+
return
223+
}
224+
cobj, ok = tombstone.Obj.(client.Object)
225+
if !ok {
226+
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
227+
return
228+
}
229+
}
230+
231+
clusterName := logicalcluster.From(cobj)
232+
233+
// check if there is no object left in the index.
234+
keys, err := shInf.GetIndexer().IndexKeys(kcpcache.ClusterIndexName, clusterName.String())
235+
if err != nil {
236+
p.log.Error(err, "failed to get index keys", "cluster", clusterName)
237+
return
238+
}
239+
if len(keys) == 0 {
240+
p.lock.Lock()
241+
242+
// shut down individual event broadaster
243+
if err := p.recorderProvider.StopBroadcaster(clusterName.String()); err != nil {
244+
p.lock.Unlock()
245+
p.log.Error(err, "failed to stop event broadcaster", "cluster", clusterName)
246+
return
247+
}
248+
249+
cancel, ok := p.cancelFns[clusterName]
250+
if ok {
251+
p.log.Info("disengaging cluster", "cluster", clusterName)
252+
cancel()
253+
delete(p.cancelFns, clusterName)
254+
delete(p.clusters, clusterName)
255+
}
256+
p.lock.Unlock()
257+
}
258+
},
259+
}); err != nil {
260+
return fmt.Errorf("failed to add EventHandler: %w", err)
261+
}
262+
263+
g.Go(func() error { return p.cache.Start(ctx) })
264+
g.Go(func() error {
265+
// wait for context stop and try to shut down event broadcasters
266+
select {
267+
case <-ctx.Done():
268+
shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
269+
defer cancel()
270+
p.recorderProvider.Stop(shutdownCtx)
271+
default:
272+
}
273+
return nil
274+
})
275+
276+
syncCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
277+
defer cancel()
278+
279+
if _, err := p.cache.GetInformer(syncCtx, p.object, cache.BlockUntilSynced(true)); err != nil {
280+
return fmt.Errorf("failed to sync %T informer: %w", p.object, err)
281+
}
282+
283+
return g.Wait()
284+
}
285+
286+
// Get returns a [cluster.Cluster] by logical cluster name. Be aware that workspace paths do not work.
287+
func (p *Provider) Get(_ context.Context, name string) (cluster.Cluster, error) {
288+
p.lock.RLock()
289+
defer p.lock.RUnlock()
290+
if cl, ok := p.clusters[logicalcluster.Name(name)]; ok {
291+
return cl, nil
292+
}
293+
294+
return nil, multicluster.ErrClusterNotFound
295+
}
296+
297+
// GetWildcard returns the wildcard cache.
298+
func (p *Provider) GetWildcard() cache.Cache {
299+
return p.cache
300+
}
301+
302+
// IndexField indexes the given object by the given field on all engaged clusters, current and future.
303+
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
304+
return p.cache.IndexField(ctx, obj, field, extractValue)
305+
}

0 commit comments

Comments
 (0)