From 41d5d08686277a019c4cb3ee9f077c4ec880e37b Mon Sep 17 00:00:00 2001 From: Tomofumi Hayashi Date: Mon, 31 Jul 2023 23:49:16 +0900 Subject: [PATCH] Support readinessIndicator file in thick multus-daemon This change supports readinessIndicatorfile in multus-daemon and refines goroutine termination in case of signal with context. --- cmd/multus-daemon/main.go | 95 ++++++++++++++++++++----------- pkg/multus/multus.go | 17 +----- pkg/server/config/manager.go | 87 ++++++++++++++++++---------- pkg/server/config/manager_test.go | 12 ++-- pkg/server/server.go | 19 ++++--- pkg/types/conf.go | 13 +++++ 6 files changed, 153 insertions(+), 90 deletions(-) diff --git a/cmd/multus-daemon/main.go b/cmd/multus-daemon/main.go index be2c02988..298e62cef 100644 --- a/cmd/multus-daemon/main.go +++ b/cmd/multus-daemon/main.go @@ -23,8 +23,11 @@ import ( "io" "net/http" "os" + "os/signal" "os/user" "path/filepath" + "sync" + "syscall" "time" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -35,6 +38,7 @@ import ( srv "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/api" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/server/config" + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/types" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -54,32 +58,47 @@ func main() { os.Exit(4) } - configWatcherStopChannel := make(chan struct{}) configWatcherDoneChannel := make(chan struct{}) - serverStopChannel := make(chan struct{}) serverDoneChannel := make(chan struct{}) + multusConfigFile := "" + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) daemonConf, err := cniServerConfig(*configFilePath) if err != nil { os.Exit(1) } - if err := startMultusDaemon(daemonConf, serverStopChannel, serverDoneChannel); err != nil { - logging.Panicf("failed start the multus thick-plugin listener: %v", err) - os.Exit(3) - } - multusConf, err := config.ParseMultusConfig(*configFilePath) if err != nil { logging.Panicf("startMultusDaemon failed to load the multus configuration: %v", err) os.Exit(1) } + logging.Verbosef("multus-daemon started") + + if multusConf.ReadinessIndicatorFile != "" { + // Check readinessindicator file before daemon launch + logging.Verbosef("Readiness Indicator file check") + if err := types.GetReadinessIndicatorFile(multusConf.ReadinessIndicatorFile); err != nil { + _ = logging.Errorf("have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", multusConf.ReadinessIndicatorFile, err) + os.Exit(1) + } + logging.Verbosef("Readiness Indicator file check done!") + } + + if err := startMultusDaemon(ctx, daemonConf, serverDoneChannel); err != nil { + logging.Panicf("failed start the multus thick-plugin listener: %v", err) + os.Exit(3) + } + // Wait until daemon ready + logging.Verbosef("API readiness check") if waitUntilAPIReady(daemonConf.SocketDir) != nil { logging.Panicf("failed to ready multus-daemon socket: %v", err) os.Exit(1) } + logging.Verbosef("API readiness check done!") // Generate multus CNI config from current CNI config if multusConf.MultusConfigFile == "auto" { @@ -111,39 +130,51 @@ func main() { } logging.Verbosef("Generated MultusCNI config: %s", generatedMultusConfig) - if err := configManager.PersistMultusConfig(generatedMultusConfig); err != nil { + multusConfigFile, err = configManager.PersistMultusConfig(generatedMultusConfig) + if err != nil { _ = logging.Errorf("failed to persist the multus configuration: %v", err) } - go func(stopChannel chan<- struct{}, doneChannel chan<- struct{}) { - if err := configManager.MonitorPluginConfiguration(configWatcherStopChannel, doneChannel); err != nil { + go func(ctx context.Context, doneChannel chan<- struct{}) { + if err := configManager.MonitorPluginConfiguration(ctx, doneChannel); err != nil { _ = logging.Errorf("error watching file: %v", err) } - }(configWatcherStopChannel, configWatcherDoneChannel) - - <-configWatcherDoneChannel + }(ctx, configWatcherDoneChannel) } else { if err := copyUserProvidedConfig(multusConf.MultusConfigFile, multusConf.CniConfigDir); err != nil { logging.Errorf("failed to copy the user provided configuration %s: %v", multusConf.MultusConfigFile, err) } } - serverDone := false - configWatcherDone := false - for { - select { - case <-configWatcherDoneChannel: - logging.Verbosef("ConfigWatcher done") - configWatcherDone = true - case <-serverDoneChannel: - logging.Verbosef("multus-server done.") - serverDone = true + signalCh := make(chan os.Signal, 16) + signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + for sig := range signalCh { + logging.Verbosef("caught %v, stopping...", sig) + cancel() } + }() - if serverDone && configWatcherDone { - return - } + var wg sync.WaitGroup + if multusConf.MultusConfigFile == "auto" { + wg.Add(1) + go func() { + <-configWatcherDoneChannel + logging.Verbosef("ConfigWatcher done") + logging.Verbosef("Delete old config @ %v", multusConfigFile) + os.Remove(multusConfigFile) + wg.Done() + }() } + + wg.Add(1) + go func() { + <-serverDoneChannel + logging.Verbosef("multus-server done.") + wg.Done() + }() + + wg.Wait() // never reached } @@ -157,7 +188,7 @@ func waitUntilAPIReady(socketPath string) error { }) } -func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{}, done chan struct{}) error { +func startMultusDaemon(ctx context.Context, daemonConfig *srv.ControllerNetConf, done chan struct{}) error { if user, err := user.Current(); err != nil || user.Uid != "0" { return fmt.Errorf("failed to run multus-daemon with root: %v, now running in uid: %s", err, user.Uid) } @@ -172,11 +203,11 @@ func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{} } if daemonConfig.MetricsPort != nil { - go utilwait.Until(func() { + go utilwait.UntilWithContext(ctx, func(ctx context.Context) { http.Handle("/metrics", promhttp.Handler()) logging.Debugf("metrics port: %d", *daemonConfig.MetricsPort) logging.Debugf("metrics: %s", http.ListenAndServe(fmt.Sprintf(":%d", *daemonConfig.MetricsPort), nil)) - }, 0, stopCh) + }, 0) } l, err := srv.GetListener(api.SocketPath(daemonConfig.SocketDir)) @@ -186,13 +217,13 @@ func startMultusDaemon(daemonConfig *srv.ControllerNetConf, stopCh chan struct{} server.SetKeepAlivesEnabled(false) go func() { - utilwait.Until(func() { + utilwait.UntilWithContext(ctx, func(ctx context.Context) { logging.Debugf("open for business") if err := server.Serve(l); err != nil { utilruntime.HandleError(fmt.Errorf("CNI server Serve() failed: %v", err)) } - }, 0, stopCh) - server.Shutdown(context.TODO()) + }, 0) + server.Shutdown(context.Background()) close(done) }() diff --git a/pkg/multus/multus.go b/pkg/multus/multus.go index 50af45f05..6238b1d8d 100644 --- a/pkg/multus/multus.go +++ b/pkg/multus/multus.go @@ -58,11 +58,6 @@ var ( releaseStatus = "" ) -var ( - pollDuration = 1000 * time.Millisecond - pollTimeout = 45 * time.Second -) - // PrintVersionString ... func PrintVersionString() string { return fmt.Sprintf("version:%s(%s%s), commit:%s, date:%s", version, gitTreeState, releaseStatus, commit, date) @@ -575,11 +570,7 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c } if n.ReadinessIndicatorFile != "" { - err := wait.PollImmediate(pollDuration, pollTimeout, func() (bool, error) { - _, err := os.Stat(n.ReadinessIndicatorFile) - return err == nil, nil - }) - if err != nil { + if err := types.GetReadinessIndicatorFile(n.ReadinessIndicatorFile); err != nil { return nil, cmdErr(k8sArgs, "have you checked that your default network is ready? still waiting for readinessindicatorfile @ %v. pollimmediate error: %v", n.ReadinessIndicatorFile, err) } } @@ -813,11 +804,7 @@ func CmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) er } if in.ReadinessIndicatorFile != "" { - err := wait.PollImmediate(pollDuration, pollTimeout, func() (bool, error) { - _, err := os.Stat(in.ReadinessIndicatorFile) - return err == nil, nil - }) - if err != nil { + if err := types.GetReadinessIndicatorFile(in.ReadinessIndicatorFile); err != nil { return cmdErr(k8sArgs, "PollImmediate error waiting for ReadinessIndicatorFile (on del): %v", err) } } diff --git a/pkg/server/config/manager.go b/pkg/server/config/manager.go index ddfd2a3d4..32a80052f 100644 --- a/pkg/server/config/manager.go +++ b/pkg/server/config/manager.go @@ -15,9 +15,11 @@ package config import ( + "context" "encoding/json" "fmt" "os" + "path/filepath" "github.com/fsnotify/fsnotify" @@ -34,12 +36,13 @@ const ( // Manager monitors the configuration of the primary CNI plugin, and // regenerates multus configuration whenever it gets updated. type Manager struct { - cniConfigData map[string]interface{} - configWatcher *fsnotify.Watcher - multusConfig *MultusConf - multusConfigDir string - multusConfigFilePath string - primaryCNIConfigPath string + cniConfigData map[string]interface{} + configWatcher *fsnotify.Watcher + multusConfig *MultusConf + multusConfigDir string + multusConfigFilePath string + readinessIndicatorFilePath string + primaryCNIConfigPath string } // NewManager returns a config manager object, configured to read the @@ -95,7 +98,12 @@ func newManager(config MultusConf, multusConfigDir, defaultCNIPluginName string, } } - watcher, err := newWatcher(multusConfigDir) + readinessIndicatorPath := "" + if config.ReadinessIndicatorFile != "" { + readinessIndicatorPath = filepath.Dir(config.ReadinessIndicatorFile) + } + + watcher, err := newWatcher(multusConfigDir, readinessIndicatorPath) if err != nil { return nil, err } @@ -105,11 +113,12 @@ func newManager(config MultusConf, multusConfigDir, defaultCNIPluginName string, } configManager := &Manager{ - configWatcher: watcher, - multusConfig: &config, - multusConfigDir: multusConfigDir, - multusConfigFilePath: cniPluginConfigFilePath(config.CniConfigDir, multusConfigFileName), - primaryCNIConfigPath: cniPluginConfigFilePath(multusConfigDir, defaultCNIPluginName), + configWatcher: watcher, + multusConfig: &config, + multusConfigDir: multusConfigDir, + multusConfigFilePath: cniPluginConfigFilePath(config.CniConfigDir, multusConfigFileName), + primaryCNIConfigPath: cniPluginConfigFilePath(multusConfigDir, defaultCNIPluginName), + readinessIndicatorFilePath: config.ReadinessIndicatorFile, } if err := configManager.loadPrimaryCNIConfigFromFile(); err != nil { @@ -157,7 +166,7 @@ func (m *Manager) loadPrimaryCNIConfigurationData(primaryCNIConfigData interface } // GenerateConfig generates a multus configuration from its current state -func (m Manager) GenerateConfig() (string, error) { +func (m *Manager) GenerateConfig() (string, error) { if err := m.loadPrimaryCNIConfigFromFile(); err != nil { _ = logging.Errorf("failed to read the primary CNI plugin config from %s", m.primaryCNIConfigPath) return "", nil @@ -168,22 +177,22 @@ func (m Manager) GenerateConfig() (string, error) { // MonitorPluginConfiguration monitors the configuration file pointed // to by the primaryCNIPluginName attribute, and re-generates the multus // configuration whenever the primary CNI config is updated. -func (m Manager) MonitorPluginConfiguration(shutDown <-chan struct{}, done chan<- struct{}) error { +func (m *Manager) MonitorPluginConfiguration(ctx context.Context, done chan<- struct{}) error { logging.Verbosef("started to watch file %s", m.primaryCNIConfigPath) for { select { case event := <-m.configWatcher.Events: - // we're watching the DIR where the config sits, and the event - // does not concern the primary CNI config. Skip it. - if event.Name != m.primaryCNIConfigPath { - logging.Debugf("skipping un-related event %v", event) + if !m.shouldRegenerateConfig(event) { continue } logging.Debugf("process event: %v", event) - if !shouldRegenerateConfig(event) { - continue + // if readinessIndicatorFile is removed, then restart multus + if m.readinessIndicatorFilePath != "" && m.readinessIndicatorFilePath == event.Name { + logging.Verbosef("readiness indicator file is gone. restart multus-daemon") + os.Remove(m.multusConfigFilePath) + os.Exit(2) } updatedConfig, err := m.GenerateConfig() @@ -192,7 +201,7 @@ func (m Manager) MonitorPluginConfiguration(shutDown <-chan struct{}, done chan< } logging.Debugf("Re-generated MultusCNI config: %s", updatedConfig) - if err := m.PersistMultusConfig(updatedConfig); err != nil { + if _, err := m.PersistMultusConfig(updatedConfig); err != nil { _ = logging.Errorf("failed to persist the multus configuration: %v", err) } if err := m.loadPrimaryCNIConfigFromFile(); err != nil { @@ -205,10 +214,9 @@ func (m Manager) MonitorPluginConfiguration(shutDown <-chan struct{}, done chan< } logging.Errorf("CNI monitoring error %v", err) - case <-shutDown: + case <-ctx.Done(): logging.Verbosef("Stopped monitoring, closing channel ...") _ = m.configWatcher.Close() - close(done) return nil } } @@ -216,9 +224,24 @@ func (m Manager) MonitorPluginConfiguration(shutDown <-chan struct{}, done chan< // PersistMultusConfig persists the provided configuration to the disc, with // Read / Write permissions. The output file path is `/00-multus.conf` -func (m Manager) PersistMultusConfig(config string) error { +func (m *Manager) PersistMultusConfig(config string) (string, error) { logging.Debugf("Writing Multus CNI configuration @ %s", m.multusConfigFilePath) - return os.WriteFile(m.multusConfigFilePath, []byte(config), UserRWPermission) + return m.multusConfigFilePath, os.WriteFile(m.multusConfigFilePath, []byte(config), UserRWPermission) +} + +func (m *Manager) shouldRegenerateConfig(event fsnotify.Event) bool { + // first, check the readiness indicator file existence + if event.Name == m.readinessIndicatorFilePath { + return event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) + } + + // we're watching the DIR where the config sits, and the event + // does not concern the primary CNI config. Skip it. + if event.Name == m.primaryCNIConfigPath { + return event.Has(fsnotify.Write) || event.Has(fsnotify.Create) + } + logging.Debugf("skipping un-related event %v", event) + return false } func getPrimaryCNIPluginName(multusAutoconfigDir string) (string, error) { @@ -233,7 +256,7 @@ func cniPluginConfigFilePath(cniConfigDir string, cniConfigFileName string) stri return cniConfigDir + fmt.Sprintf("/%s", cniConfigFileName) } -func newWatcher(cniConfigDir string) (*fsnotify.Watcher, error) { +func newWatcher(cniConfigDir string, readinessIndicatorDir string) (*fsnotify.Watcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, fmt.Errorf("failed to create new watcher for %q: %v", cniConfigDir, err) @@ -246,16 +269,18 @@ func newWatcher(cniConfigDir string) (*fsnotify.Watcher, error) { }() if err = watcher.Add(cniConfigDir); err != nil { - return nil, fmt.Errorf("failed to add watch on %q: %v", cniConfigDir, err) + return nil, fmt.Errorf("failed to add watch on %q for cni config: %v", cniConfigDir, err) + } + // if readinessIndicatorDir is different from cniConfigDir, + if readinessIndicatorDir != "" && cniConfigDir != readinessIndicatorDir { + if err = watcher.Add(readinessIndicatorDir); err != nil { + return nil, fmt.Errorf("failed to add watch on %q for readinessIndicator: %v", readinessIndicatorDir, err) + } } return watcher, nil } -func shouldRegenerateConfig(event fsnotify.Event) bool { - return event.Has(fsnotify.Write) || event.Has(fsnotify.Create) -} - func primaryCNIData(masterCNIPluginPath string) (interface{}, error) { masterCNIConfigData, err := os.ReadFile(masterCNIPluginPath) if err != nil { diff --git a/pkg/server/config/manager_test.go b/pkg/server/config/manager_test.go index 2ffb231c6..773a2f4ae 100644 --- a/pkg/server/config/manager_test.go +++ b/pkg/server/config/manager_test.go @@ -15,6 +15,7 @@ package config import ( + "context" "encoding/json" "fmt" "os" @@ -98,14 +99,15 @@ var _ = Describe("Configuration Manager", func() { config, err := configManager.GenerateConfig() Expect(err).NotTo(HaveOccurred()) - err = configManager.PersistMultusConfig(config) + _, err = configManager.PersistMultusConfig(config) Expect(err).NotTo(HaveOccurred()) + ctx, cancel := context.WithCancel(context.Background()) configWatcherDoneChannel := make(chan struct{}) - go func(stopChannel chan struct{}, doneChannel chan struct{}) { - err := configManager.MonitorPluginConfiguration(configWatcherDoneChannel, stopChannel) + go func(ctx context.Context, doneChannel chan struct{}) { + err := configManager.MonitorPluginConfiguration(ctx, doneChannel) Expect(err).NotTo(HaveOccurred()) - }(make(chan struct{}), configWatcherDoneChannel) + }(ctx, configWatcherDoneChannel) updatedCNIConfig := ` { @@ -126,7 +128,7 @@ var _ = Describe("Configuration Manager", func() { Expect(string(file)).To(Equal(config)) // stop groutine - configWatcherDoneChannel <- struct{}{} + cancel() }) When("the user requests the name of the multus configuration to be overridden", func() { diff --git a/pkg/server/server.go b/pkg/server/server.go index 4eb4e0770..6d13600a0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -57,13 +57,18 @@ func FilesystemPreRequirements(rundir string) error { return nil } +func printCmdArgs(args *skel.CmdArgs) string { + return fmt.Sprintf("ContainerID:%q Netns:%q IfName:%q Args:%q Path:%q", + args.ContainerID, args.Netns, args.IfName, args.Args, args.Path) +} + // HandleCNIRequest is the CNI server handler function; it is invoked whenever // a CNI request is processed. func (s *Server) HandleCNIRequest(cmd string, k8sArgs *types.K8sArgs, cniCmdArgs *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) ([]byte, error) { var result []byte var err error - logging.Verbosef("%s starting CNI request %+v", cmd, cniCmdArgs) + logging.Verbosef("%s starting CNI request %s", cmd, printCmdArgs(cniCmdArgs)) switch cmd { case "ADD": result, err = cmdAdd(cniCmdArgs, k8sArgs, exec, kubeClient) @@ -74,10 +79,10 @@ func (s *Server) HandleCNIRequest(cmd string, k8sArgs *types.K8sArgs, cniCmdArgs default: return []byte(""), fmt.Errorf("unknown cmd type: %s", cmd) } - logging.Verbosef("%s finished CNI request %+v, result: %q, err: %v", cmd, *cniCmdArgs, string(result), err) + logging.Verbosef("%s finished CNI request %s, result: %q, err: %v", cmd, printCmdArgs(cniCmdArgs), string(result), err) if err != nil { // Prefix errors with request info for easier failure debugging - return nil, fmt.Errorf("%+v ERRORED: %v", *cniCmdArgs, err) + return nil, fmt.Errorf("%s ERRORED: %v", printCmdArgs(cniCmdArgs), err) } return result, nil } @@ -95,7 +100,7 @@ func (s *Server) HandleDelegateRequest(cmd string, k8sArgs *types.K8sArgs, cniCm return nil, err } - logging.Verbosef("%s starting delegate request %+v", cmd, cniCmdArgs) + logging.Verbosef("%s starting delegate request %s", cmd, printCmdArgs(cniCmdArgs)) switch cmd { case "ADD": result, err = cmdDelegateAdd(cniCmdArgs, k8sArgs, exec, kubeClient, multusConfig, interfaceAttributes) @@ -106,10 +111,10 @@ func (s *Server) HandleDelegateRequest(cmd string, k8sArgs *types.K8sArgs, cniCm default: return []byte(""), fmt.Errorf("unknown cmd type: %s", cmd) } - logging.Verbosef("%s finished Delegate request %+v, result: %q, err: %v", cmd, *cniCmdArgs, string(result), err) + logging.Verbosef("%s finished Delegate request %s, result: %q, err: %v", cmd, printCmdArgs(cniCmdArgs), string(result), err) if err != nil { // Prefix errors with request info for easier failure debugging - return nil, fmt.Errorf("%+v ERRORED: %v", *cniCmdArgs, err) + return nil, fmt.Errorf("%s ERRORED: %v", printCmdArgs(cniCmdArgs), err) } return result, nil } @@ -293,7 +298,7 @@ func (s *Server) handleDelegateRequest(r *http.Request) ([]byte, error) { result, err := s.HandleDelegateRequest(cmdType, k8sArgs, cniCmdArgs, s.exec, s.kubeclient, cr.InterfaceAttributes) if err != nil { // Prefix error with request information for easier debugging - return nil, fmt.Errorf("%+v %v", cniCmdArgs, err) + return nil, fmt.Errorf("%s %v", printCmdArgs(cniCmdArgs), err) } return result, nil } diff --git a/pkg/types/conf.go b/pkg/types/conf.go index 36d8d6f9f..8295bd9c6 100644 --- a/pkg/types/conf.go +++ b/pkg/types/conf.go @@ -22,6 +22,9 @@ import ( "os" "strings" "sync" + "time" + + utilwait "k8s.io/apimachinery/pkg/util/wait" "github.com/containernetworking/cni/libcni" "github.com/containernetworking/cni/pkg/skel" @@ -609,3 +612,13 @@ func CheckSystemNamespaces(namespace string, systemNamespaces []string) bool { } return false } + +// GetReadinessIndicatorFile waits for readinessIndicatorFile +func GetReadinessIndicatorFile(readinessIndicatorFile string) error { + pollDuration := 1000 * time.Millisecond + pollTimeout := 45 * time.Second + return utilwait.PollImmediate(pollDuration, pollTimeout, func() (bool, error) { + _, err := os.Stat(readinessIndicatorFile) + return err == nil, nil + }) +}