forked from microsoft/retina
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdaemon.go
348 lines (304 loc) · 12.4 KB
/
daemon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package legacy
import (
"fmt"
"net/http"
"os"
"strings"
"time"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
crcache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
kcfg "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
crmgr "sigs.k8s.io/controller-runtime/pkg/manager"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"github.com/go-logr/zapr"
retinav1alpha1 "github.com/microsoft/retina/crd/api/v1alpha1"
"github.com/microsoft/retina/internal/buildinfo"
"github.com/microsoft/retina/pkg/config"
controllercache "github.com/microsoft/retina/pkg/controllers/cache"
mcc "github.com/microsoft/retina/pkg/controllers/daemon/metricsconfiguration"
namespacecontroller "github.com/microsoft/retina/pkg/controllers/daemon/namespace"
nc "github.com/microsoft/retina/pkg/controllers/daemon/node"
pc "github.com/microsoft/retina/pkg/controllers/daemon/pod"
kec "github.com/microsoft/retina/pkg/controllers/daemon/retinaendpoint"
sc "github.com/microsoft/retina/pkg/controllers/daemon/service"
"github.com/microsoft/retina/pkg/enricher"
"github.com/microsoft/retina/pkg/log"
cm "github.com/microsoft/retina/pkg/managers/controllermanager"
"github.com/microsoft/retina/pkg/managers/filtermanager"
"github.com/microsoft/retina/pkg/metrics"
mm "github.com/microsoft/retina/pkg/module/metrics"
"github.com/microsoft/retina/pkg/pubsub"
"github.com/microsoft/retina/pkg/telemetry"
)
const (
logFileName = "retina.log"
heartbeatInterval = 15 * time.Minute
nodeNameEnvKey = "NODE_NAME"
nodeIPEnvKey = "NODE_IP"
)
var (
healthzChecker healthz.Checker
)
var scheme = k8sruntime.NewScheme()
func init() {
//+kubebuilder:scaffold:scheme
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(retinav1alpha1.AddToScheme(scheme))
}
type Daemon struct {
metricsAddr string
probeAddr string
enableLeaderElection bool
configFile string
}
func NewDaemon(metricsAddr, probeAddr, configFile string, enableLeaderElection bool) *Daemon {
return &Daemon{
metricsAddr: metricsAddr,
probeAddr: probeAddr,
enableLeaderElection: enableLeaderElection,
configFile: configFile,
}
}
func (d *Daemon) Start() error {
fmt.Printf("starting Retina daemon with legacy control plane %v\n", buildinfo.Version)
if buildinfo.ApplicationInsightsID != "" {
telemetry.InitAppInsights(buildinfo.ApplicationInsightsID, buildinfo.Version)
defer telemetry.ShutdownAppInsights()
defer telemetry.TrackPanic()
}
daemonConfig, err := config.GetConfig(d.configFile)
if err != nil {
panic(err)
}
fmt.Println("init client-go")
var cfg *rest.Config
if kubeconfig := os.Getenv("KUBECONFIG"); kubeconfig != "" {
fmt.Println("KUBECONFIG set, using kubeconfig: ", kubeconfig)
cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return fmt.Errorf("creating controller-runtime manager: %w", err)
}
} else {
cfg, err = kcfg.GetConfig()
if err != nil {
panic(err)
}
}
fmt.Println("api server: ", cfg.Host)
fmt.Println("init logger")
zl, err := log.SetupZapLogger(&log.LogOpts{
Level: daemonConfig.LogLevel,
File: false,
FileName: logFileName,
MaxFileSizeMB: 100, //nolint:gomnd // defaults
MaxBackups: 3, //nolint:gomnd // defaults
MaxAgeDays: 30, //nolint:gomnd // defaults
ApplicationInsightsID: buildinfo.ApplicationInsightsID,
EnableTelemetry: daemonConfig.EnableTelemetry,
},
zap.String("version", buildinfo.Version),
zap.String("apiserver", cfg.Host),
zap.String("plugins", strings.Join(daemonConfig.EnabledPlugin, `,`)),
zap.String("data aggregation level", daemonConfig.DataAggregationLevel.String()),
)
if err != nil {
panic(err)
}
defer zl.Close()
mainLogger := zl.Named("main").Sugar()
// Allow the current process to lock memory for eBPF resources.
// OS specific implementation.
// This is a no-op on Windows.
if err = d.RemoveMemlock(); err != nil {
mainLogger.Fatal("failed to remove memlock", zap.Error(err))
}
metrics.InitializeMetrics()
mainLogger.Info(zap.String("data aggregation level", daemonConfig.DataAggregationLevel.String()))
var tel telemetry.Telemetry
if daemonConfig.EnableTelemetry {
if buildinfo.ApplicationInsightsID == "" {
panic("telemetry enabled, but ApplicationInsightsID is empty")
}
mainLogger.Info("telemetry enabled", zap.String("applicationInsightsID", buildinfo.ApplicationInsightsID))
tel, err = telemetry.NewAppInsightsTelemetryClient("retina-agent", map[string]string{
"version": buildinfo.Version,
"apiserver": cfg.Host,
"plugins": strings.Join(daemonConfig.EnabledPlugin, `,`),
})
if err != nil {
mainLogger.Error("failed to create telemetry client", zap.Error(err))
return fmt.Errorf("error when creating telemetry client: %w", err)
}
} else {
mainLogger.Info("telemetry disabled")
tel = telemetry.NewNoopTelemetry()
}
// Create a manager for controller-runtime
mgrOption := crmgr.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: d.metricsAddr,
},
HealthProbeBindAddress: d.probeAddr,
LeaderElection: d.enableLeaderElection,
LeaderElectionID: "ecaf1259.retina.sh",
}
// Local context has its meaning only when pod level(advanced) metrics is enabled.
if daemonConfig.EnablePodLevel && !daemonConfig.RemoteContext {
mainLogger.Info("Remote context is disabled, only pods deployed on the same node as retina-agent will be monitored")
// the new cache sets Selector options on the Manager cache which are used
// to perform *server-side* filtering of the cached objects. This is very important
// for high node/pod count clusters, as it keeps us from watching objects at the
// whole cluster scope when we are only interested in the Node's scope.
nodeName := os.Getenv(nodeNameEnvKey)
if nodeName == "" {
mainLogger.Fatal("failed to get node name from environment variable", zap.String("node name env key", nodeNameEnvKey))
}
podNodeNameSelector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
// Ignore hostnetwork pods which share the same IP with the node and pods on the same node.
// Unlike spec.nodeName, field label "spec.hostNetwork" is not supported, and as a workaround,
// We use status.podIP to filter out hostnetwork pods.
// https://github.com/kubernetes/kubernetes/blob/41da26dbe15207cbe5b6c36b48a31d2cd3344123/pkg/apis/core/v1/conversion.go#L36
nodeIP := os.Getenv(nodeIPEnvKey)
if nodeIP == "" {
mainLogger.Fatal("failed to get node IP from environment variable", zap.String("node IP env key", nodeIPEnvKey))
}
podNodeIPNotMatchSelector := fields.OneTermNotEqualSelector("status.podIP", nodeIP)
podSelector := fields.AndSelectors(podNodeNameSelector, podNodeIPNotMatchSelector)
mainLogger.Info("pod selector when remote context is disabled", zap.String("pod selector", podSelector.String()))
mgrOption.Cache = crcache.Options{
ByObject: map[client.Object]crcache.ByObject{
&corev1.Pod{}: {
Field: podSelector,
},
},
}
}
mgr, err := crmgr.New(cfg, mgrOption)
if err != nil {
mainLogger.Error("Unable to start manager", zap.Error(err))
return fmt.Errorf("creating controller-runtime manager: %w", err)
}
//+kubebuilder:scaffold:builder
// k8s Client used for informers
cl := kubernetes.NewForConfigOrDie(mgr.GetConfig())
serverVersion, err := cl.Discovery().ServerVersion()
if err != nil {
mainLogger.Error("failed to get Kubernetes server version: ", zap.Error(err))
} else {
mainLogger.Infof("Kubernetes server version: %v", serverVersion)
}
// Setup RetinaEndpoint controller.
// TODO(mainred): This is to temporarily create a cache and pubsub for RetinaEndpoint, need to refactor this.
ctx := ctrl.SetupSignalHandler()
ctrl.SetLogger(zapr.NewLogger(zl.Logger.Named("controller-runtime")))
if daemonConfig.EnablePodLevel {
pubSub := pubsub.New()
controllerCache := controllercache.New(pubSub)
enrich := enricher.New(ctx, controllerCache)
//nolint:govet // shadowing this err is fine
fm, err := filtermanager.Init(5) //nolint:gomnd // defaults
if err != nil {
mainLogger.Fatal("unable to create filter manager", zap.Error(err))
}
defer fm.Stop() //nolint:errcheck // best effort
enrich.Run()
metricsModule := mm.InitModule(ctx, daemonConfig, pubSub, enrich, fm, controllerCache)
if !daemonConfig.RemoteContext {
mainLogger.Info("Initializing Pod controller")
podController := pc.New(mgr.GetClient(), controllerCache)
if err := podController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create PodController", zap.Error(err))
}
} else if daemonConfig.EnableRetinaEndpoint {
mainLogger.Info("RetinaEndpoint is enabled")
mainLogger.Info("Initializing RetinaEndpoint controller")
retinaEndpointController := kec.New(mgr.GetClient(), controllerCache)
if err := retinaEndpointController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create retinaEndpointController", zap.Error(err))
}
}
mainLogger.Info("Initializing Node controller")
nodeController := nc.New(mgr.GetClient(), controllerCache)
if err := nodeController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create nodeController", zap.Error(err))
}
mainLogger.Info("Initializing Service controller")
svcController := sc.New(mgr.GetClient(), controllerCache)
if err := svcController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create svcController", zap.Error(err))
}
if daemonConfig.EnableAnnotations {
mainLogger.Info("Initializing MetricsConfig namespaceController")
namespaceController := namespacecontroller.New(mgr.GetClient(), controllerCache, metricsModule)
if err := namespaceController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create namespaceController", zap.Error(err))
}
go namespaceController.Start(ctx)
} else {
mainLogger.Info("Initializing MetricsConfig controller")
metricsConfigController := mcc.New(mgr.GetClient(), mgr.GetScheme(), metricsModule)
if err := metricsConfigController.SetupWithManager(mgr); err != nil {
mainLogger.Fatal("unable to create metricsConfigController", zap.Error(err))
}
}
// Define a custom health check for advanced metrics
healthzChecker = healthz.CheckHandler{
Checker: healthz.Checker(func(req *http.Request) error {
_, err := metricsModule.Status()
if err != nil {
mainLogger.Error("failed to get metrics module status fr advanced metrics", zap.Error(err))
return err
}
return nil
}),
}.Checker
} else {
// Advanced Metric not enabled, Ping healthcheck
healthzChecker = healthz.Ping
}
controllerMgr, err := cm.NewControllerManager(daemonConfig, cl, tel)
if err != nil {
mainLogger.Fatal("Failed to create controller manager", zap.Error(err))
}
if err := controllerMgr.Init(ctx); err != nil {
mainLogger.Fatal("Failed to initialize controller manager", zap.Error(err))
}
// Stop is best effort. If it fails, we still want to stop the main process.
// This is needed for graceful shutdown of Retina plugins.
// Do it in the main thread as graceful shutdown is important.
defer controllerMgr.Stop(ctx)
// start heartbeat goroutine for application insights
go tel.Heartbeat(ctx, heartbeatInterval)
// Start controller manager, which will start http server and plugin manager.
go controllerMgr.Start(ctx)
mainLogger.Info("Started controller manager")
//Set health checks according to retina confiuration
if err := mgr.AddHealthzCheck("healthz", healthzChecker); err != nil {
mainLogger.Error("unable to set up custom health check", zap.Error(err))
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthzChecker); err != nil {
mainLogger.Error("unable to set up custom ready check", zap.Error(err))
os.Exit(1)
}
// Start all registered controllers. This will block until container receives SIGTERM.
if err := mgr.Start(ctx); err != nil {
mainLogger.Fatal("unable to start manager", zap.Error(err))
}
mainLogger.Info("Network observability exiting. Till next time!")
return nil
}