From 93745fac32010b5953d3dbe3ca04a5a8b055c166 Mon Sep 17 00:00:00 2001 From: Chenzhao Huang <949412843@qq.com> Date: Thu, 21 Nov 2024 23:01:17 +0800 Subject: [PATCH] 1121 --- pkg/yurtadm/cmd/join/join.go | 50 +++++++- pkg/yurtadm/cmd/join/joindata/data.go | 2 + pkg/yurtadm/cmd/join/phases/postcheck.go | 26 ++-- pkg/yurtadm/cmd/join/phases/prepare.go | 46 ++++--- pkg/yurtadm/constants/constants.go | 29 +++++ pkg/yurtadm/util/initsystem/initsystem.go | 3 + .../util/initsystem/initsystem_unix.go | 16 +++ .../util/initsystem/initsystem_windows.go | 64 ++++++++++ pkg/yurtadm/util/localnode/localnode.go | 118 ++++++++++++++++++ pkg/yurtadm/util/yurthub/yurthub_test.go | 8 ++ 10 files changed, 333 insertions(+), 29 deletions(-) create mode 100644 pkg/yurtadm/util/localnode/localnode.go diff --git a/pkg/yurtadm/cmd/join/join.go b/pkg/yurtadm/cmd/join/join.go index ee5285f9200..09f7f2d5985 100644 --- a/pkg/yurtadm/cmd/join/join.go +++ b/pkg/yurtadm/cmd/join/join.go @@ -20,6 +20,7 @@ package join import ( "fmt" "io" + "os" "strings" "github.com/pkg/errors" @@ -35,6 +36,7 @@ import ( "github.com/openyurtio/openyurt/pkg/util/kubernetes/kubeadm/app/util/apiclient" "github.com/openyurtio/openyurt/pkg/yurtadm/cmd/join/joindata" yurtphases "github.com/openyurtio/openyurt/pkg/yurtadm/cmd/join/phases" + "github.com/openyurtio/openyurt/pkg/yurtadm/constants" yurtconstants "github.com/openyurtio/openyurt/pkg/yurtadm/constants" "github.com/openyurtio/openyurt/pkg/yurtadm/util/edgenode" yurtadmutil "github.com/openyurtio/openyurt/pkg/yurtadm/util/kubernetes" @@ -52,6 +54,8 @@ type joinOptions struct { organizations string pauseImage string yurthubImage string + yurthubBinary string + hostControlPlaneAddr string // hostControlPlaneAddr is the address (ip:port) of host kubernetes cluster that used for yurthub local mode. namespace string caCertHashes []string unsafeSkipCAVerification bool @@ -124,7 +128,7 @@ func addJoinConfigFlags(flagSet *flag.FlagSet, joinOptions *joinOptions) { ) flagSet.StringVar( &joinOptions.nodeType, yurtconstants.NodeType, joinOptions.nodeType, - "Sets the node is edge or cloud", + "Sets the node is edge, cloud or local", ) flagSet.StringVar( &joinOptions.nodeName, yurtconstants.NodeName, joinOptions.nodeName, @@ -154,6 +158,14 @@ func addJoinConfigFlags(flagSet *flag.FlagSet, joinOptions *joinOptions) { &joinOptions.yurthubImage, yurtconstants.YurtHubImage, joinOptions.yurthubImage, "Sets the image version of yurthub component", ) + flagSet.StringVar( + &joinOptions.yurthubBinary, yurtconstants.YurtHubBinary, joinOptions.yurthubBinary, + "Sets the binary path of yurthub, this is used for deploying local mode yurthub in systemd", + ) + flagSet.StringVar( + &joinOptions.hostControlPlaneAddr, yurtconstants.HostControlPlaneAddr, joinOptions.hostControlPlaneAddr, + "Sets the address of hostControlPlaneAddr, which is the address (ip:port) of host kubernetes cluster that used for yurthub local mode", + ) flagSet.StringSliceVar( &joinOptions.caCertHashes, yurtconstants.TokenDiscoveryCAHash, joinOptions.caCertHashes, "For token-based discovery, validate that the root CA public key matches this hash (format: \":\").", @@ -227,6 +239,8 @@ type joinData struct { organizations string pauseImage string yurthubImage string + yurthubBinary string + hostControlPlaneAddr string yurthubTemplate string yurthubManifest string kubernetesVersion string @@ -257,6 +271,25 @@ func newJoinData(args []string, opt *joinOptions) (*joinData, error) { apiServerEndpoint = args[0] } + if opt.nodeType == constants.LocalNode { + // in local mode, it is necessary to prepare yurthub binary file for deploying systemd yurthub. + if len(opt.yurthubBinary) == 0 { + return nil, errors.New("yurthub binary filepath is empty, so unable to run systemd yurthub in local mode.") + } + _, err := os.Stat(opt.yurthubBinary) + if err != nil { + if os.IsNotExist(err) { + return nil, errors.New("yurthub binary file does not exist.") + } + return nil, errors.Wrapf(err, "stat yurthub binary file %s fail", opt.yurthubBinary) + } + + // in local mode, hostControlPlaneAddr is needed for systemd yurthub accessing host kubernetes cluster. + if len(opt.hostControlPlaneAddr) == 0 { + return nil, errors.New("host control plane address is empty, so unable to run systemd yurthub in local mode.") + } + } + if len(opt.token) == 0 { return nil, errors.New("join token is empty, so unable to bootstrap worker node.") } @@ -265,8 +298,8 @@ func newJoinData(args []string, opt *joinOptions) (*joinData, error) { return nil, errors.Errorf("the bootstrap token %s was not of the form %s", opt.token, yurtconstants.BootstrapTokenPattern) } - if opt.nodeType != yurtconstants.EdgeNode && opt.nodeType != yurtconstants.CloudNode { - return nil, errors.Errorf("node type(%s) is invalid, only \"edge and cloud\" are supported", opt.nodeType) + if opt.nodeType != yurtconstants.EdgeNode && opt.nodeType != yurtconstants.CloudNode && opt.nodeType != yurtconstants.LocalNode { + return nil, errors.Errorf("node type(%s) is invalid, only \"edge, cloud and local\" are supported", opt.nodeType) } if opt.unsafeSkipCAVerification && len(opt.caCertHashes) != 0 { @@ -298,6 +331,8 @@ func newJoinData(args []string, opt *joinOptions) (*joinData, error) { ignorePreflightErrors: ignoreErrors, pauseImage: opt.pauseImage, yurthubImage: opt.yurthubImage, + yurthubBinary: opt.yurthubBinary, + hostControlPlaneAddr: opt.hostControlPlaneAddr, yurthubServer: opt.yurthubServer, caCertHashes: opt.caCertHashes, organizations: opt.organizations, @@ -439,6 +474,15 @@ func (j *joinData) YurtHubImage() string { return j.yurthubImage } +// YurtHubBinary returns the YurtHub binary. +func (j *joinData) YurtHubBinary() string { + return j.yurthubBinary +} + +func (j *joinData) HostControlPlaneAddr() string { + return j.hostControlPlaneAddr +} + // YurtHubServer returns the YurtHub server addr. func (j *joinData) YurtHubServer() string { return j.yurthubServer diff --git a/pkg/yurtadm/cmd/join/joindata/data.go b/pkg/yurtadm/cmd/join/joindata/data.go index 98089565dc3..49beb5b659e 100644 --- a/pkg/yurtadm/cmd/join/joindata/data.go +++ b/pkg/yurtadm/cmd/join/joindata/data.go @@ -36,6 +36,8 @@ type YurtJoinData interface { JoinToken() string PauseImage() string YurtHubImage() string + YurtHubBinary() string + HostControlPlaneAddr() string YurtHubServer() string YurtHubTemplate() string YurtHubManifest() string diff --git a/pkg/yurtadm/cmd/join/phases/postcheck.go b/pkg/yurtadm/cmd/join/phases/postcheck.go index a70cae1cb52..3b4ab4a9607 100644 --- a/pkg/yurtadm/cmd/join/phases/postcheck.go +++ b/pkg/yurtadm/cmd/join/phases/postcheck.go @@ -20,7 +20,9 @@ import ( "k8s.io/klog/v2" "github.com/openyurtio/openyurt/pkg/yurtadm/cmd/join/joindata" + "github.com/openyurtio/openyurt/pkg/yurtadm/constants" "github.com/openyurtio/openyurt/pkg/yurtadm/util/kubernetes" + "github.com/openyurtio/openyurt/pkg/yurtadm/util/localnode" "github.com/openyurtio/openyurt/pkg/yurtadm/util/yurthub" ) @@ -33,15 +35,23 @@ func RunPostCheck(data joindata.YurtJoinData) error { klog.V(1).Infof("kubelet service is active") klog.V(1).Infof("waiting hub agent ready.") - if err := yurthub.CheckYurthubHealthz(data.YurtHubServer()); err != nil { - return err - } - klog.V(1).Infof("hub agent is ready") - - if err := yurthub.CleanHubBootstrapConfig(); err != nil { - return err + if data.NodeRegistration().WorkingMode == constants.LocalNode { + // check systemd yurthub is ready or not + if err := localnode.CheckYurthubStatus(); err != nil { + return err + } + klog.V(1).Infof("systemd yurthub agent is ready") + } else { + if err := yurthub.CheckYurthubHealthz(data.YurtHubServer()); err != nil { + return err + } + klog.V(1).Infof("staticpod yurthub agent is ready") + + if err := yurthub.CleanHubBootstrapConfig(); err != nil { + return err + } + klog.V(1).Infof("clean yurthub bootstrap config file success") } - klog.V(1).Infof("clean yurthub bootstrap config file success") return nil } diff --git a/pkg/yurtadm/cmd/join/phases/prepare.go b/pkg/yurtadm/cmd/join/phases/prepare.go index 484378baaa8..d74bf9fcf9a 100644 --- a/pkg/yurtadm/cmd/join/phases/prepare.go +++ b/pkg/yurtadm/cmd/join/phases/prepare.go @@ -26,6 +26,7 @@ import ( "github.com/openyurtio/openyurt/pkg/yurtadm/constants" "github.com/openyurtio/openyurt/pkg/yurtadm/util/edgenode" yurtadmutil "github.com/openyurtio/openyurt/pkg/yurtadm/util/kubernetes" + "github.com/openyurtio/openyurt/pkg/yurtadm/util/localnode" "github.com/openyurtio/openyurt/pkg/yurtadm/util/system" "github.com/openyurtio/openyurt/pkg/yurtadm/util/yurthub" ) @@ -33,9 +34,11 @@ import ( // RunPrepare executes the node initialization process. func RunPrepare(data joindata.YurtJoinData) error { // cleanup at first - staticPodsPath := filepath.Join(constants.KubeletConfigureDir, constants.ManifestsSubDirName) - if err := os.RemoveAll(staticPodsPath); err != nil { - klog.Warningf("remove %s: %v", staticPodsPath, err) + if data.NodeRegistration().WorkingMode != constants.LocalNode { + staticPodsPath := filepath.Join(constants.KubeletConfigureDir, constants.ManifestsSubDirName) + if err := os.RemoveAll(staticPodsPath); err != nil { + klog.Warningf("remove %s: %v", staticPodsPath, err) + } } if err := system.SetIpv4Forward(); err != nil { @@ -65,23 +68,30 @@ func RunPrepare(data joindata.YurtJoinData) error { if err := yurtadmutil.SetKubeletUnitConfig(); err != nil { return err } - if err := yurtadmutil.SetKubeletConfigForNode(); err != nil { - return err - } - if err := yurthub.SetHubBootstrapConfig(data.ServerAddr(), data.JoinToken(), data.CaCertHashes()); err != nil { - return err - } - if err := yurthub.AddYurthubStaticYaml(data, constants.StaticPodPath); err != nil { - return err - } - if len(data.StaticPodTemplateList()) != 0 { - // deploy user specified static pods - if err := edgenode.DeployStaticYaml(data.StaticPodManifestList(), data.StaticPodTemplateList(), constants.StaticPodPath); err != nil { + if data.NodeRegistration().WorkingMode == constants.LocalNode { + // deploy systemd yurthub in local mode + if err := localnode.DeployYurthubInSystemd(data.HostControlPlaneAddr(), data.ServerAddr(), data.YurtHubBinary()); err != nil { + return err + } + } else { + if err := yurtadmutil.SetKubeletConfigForNode(); err != nil { + return err + } + if err := yurthub.SetHubBootstrapConfig(data.ServerAddr(), data.JoinToken(), data.CaCertHashes()); err != nil { + return err + } + if err := yurthub.AddYurthubStaticYaml(data, constants.StaticPodPath); err != nil { + return err + } + if len(data.StaticPodTemplateList()) != 0 { + // deploy user specified static pods + if err := edgenode.DeployStaticYaml(data.StaticPodManifestList(), data.StaticPodTemplateList(), constants.StaticPodPath); err != nil { + return err + } + } + if err := yurtadmutil.SetDiscoveryConfig(data); err != nil { return err } - } - if err := yurtadmutil.SetDiscoveryConfig(data); err != nil { - return err } if data.CfgPath() == "" { if err := yurtadmutil.SetKubeadmJoinConfig(data); err != nil { diff --git a/pkg/yurtadm/constants/constants.go b/pkg/yurtadm/constants/constants.go index 3db86db21f5..d6c4e023723 100644 --- a/pkg/yurtadm/constants/constants.go +++ b/pkg/yurtadm/constants/constants.go @@ -35,6 +35,8 @@ const ( PauseImagePath = "registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.2" DefaultCertificatesDir = "/etc/kubernetes/pki" DefaultDockerCRISocket = "/var/run/dockershim.sock" + YurthubServiceFilepath = "/etc/systemd/system/yurthub.service" + YurthubEnvironmentFilePath = "/etc/systemd/system/yurthub.default" YurthubYamlName = "yurthub.yaml" YurthubStaticPodManifest = "yurthub" YurthubNamespace = "kube-system" @@ -72,6 +74,7 @@ const ( EdgeNode = "edge" CloudNode = "cloud" + LocalNode = "local" // CertificatesDir CertificatesDir = "cert-dir" @@ -107,6 +110,10 @@ const ( Namespace = "namespace" // YurtHubImage flag sets the yurthub image for worker node. YurtHubImage = "yurthub-image" + // YurtHubBinary flag sets the yurthub Binary for worker node. + YurtHubBinary = "yurthub-binary" + // HostControlPlaneAddr flag sets the address of host kubernetes cluster + HostControlPlaneAddr = "host-control-plane-addr" // YurtHubServerAddr flag set the address of yurthub server (not proxy server!) YurtHubServerAddr = "yurthub-server-addr" // ServerAddr flag set the address of kubernetes kube-apiserver @@ -272,5 +279,27 @@ spec: hostNetwork: true priorityClassName: system-node-critical priority: 2000001000 +` + + YurthubSyetmdServiceContent = ` +[Unit] +Description=local mode yurthub is deployed in systemd +Documentation=https://github.com/openyurtio/openyurt/pull/2124 + +[Service] +EnvironmentFile=/etc/systemd/system/yurthub.default +ExecStart=/usr/bin/yurthub --working-mode ${WORKINGMODE} --node-name ${NODENAME} --server-addr ${SERVERADDR} --host-control-plane-address ${HOSTCONTROLPLANEADDRESS} +Restart=always +StartLimitInterval=0 +RestartSec=10 + +[Install] +WantedBy=multi-user.target` + + YurthubSyetmdServiceEnvironmentFileContent = ` +WORKINGMODE=local +NODENAME=testnode +SERVERADDR=121.40.250.117:6443 +HOSTCONTROLPLANEADDRESS=121.40.250.117:6443 ` ) diff --git a/pkg/yurtadm/util/initsystem/initsystem.go b/pkg/yurtadm/util/initsystem/initsystem.go index 96c07ea5b15..7dfe5d9d3da 100644 --- a/pkg/yurtadm/util/initsystem/initsystem.go +++ b/pkg/yurtadm/util/initsystem/initsystem.go @@ -27,4 +27,7 @@ type InitSystem interface { // ServiceIsActive ensures the service is running, or attempting to run. (crash looping in the case of kubelet) ServiceIsActive(service string) bool + + // ServiceToStart tries to start a specific service + ServiceStart(service string) error } diff --git a/pkg/yurtadm/util/initsystem/initsystem_unix.go b/pkg/yurtadm/util/initsystem/initsystem_unix.go index efbab4c5f5a..59fff061bcf 100644 --- a/pkg/yurtadm/util/initsystem/initsystem_unix.go +++ b/pkg/yurtadm/util/initsystem/initsystem_unix.go @@ -52,6 +52,12 @@ func (openrc OpenRCInitSystem) ServiceIsActive(service string) bool { return !strings.Contains(outStr, "stopped") && !strings.Contains(outStr, "does not exist") } +// ServiceStart tries to start a specific service +func (openrc OpenRCInitSystem) ServiceStart(service string) error { + args := []string{service, "start"} + return exec.Command("rc-service", args...).Run() +} + // SystemdInitSystem defines systemd type SystemdInitSystem struct{} @@ -94,6 +100,16 @@ func (sysd SystemdInitSystem) ServiceIsActive(service string) bool { return false } +// ServiceStart tries to start a specific service +func (sysd SystemdInitSystem) ServiceStart(service string) error { + // Before we try to start any service, make sure that systemd is ready + if err := sysd.reloadSystemd(); err != nil { + return err + } + args := []string{"start", service} + return exec.Command("systemctl", args...).Run() +} + // GetInitSystem returns an InitSystem for the current system, or nil // if we cannot detect a supported init system. // This indicates we will skip init system checks, not an error. diff --git a/pkg/yurtadm/util/initsystem/initsystem_windows.go b/pkg/yurtadm/util/initsystem/initsystem_windows.go index c45751f4750..fb7d79b39ff 100644 --- a/pkg/yurtadm/util/initsystem/initsystem_windows.go +++ b/pkg/yurtadm/util/initsystem/initsystem_windows.go @@ -22,7 +22,9 @@ package initsystem import ( "fmt" + "time" + "github.com/pkg/errors" "golang.org/x/sys/windows/svc" "golang.org/x/sys/windows/svc/mgr" ) @@ -30,6 +32,68 @@ import ( // WindowsInitSystem is the windows implementation of InitSystem type WindowsInitSystem struct{} +// ServiceStart tries to start a specific service +// Following Windows documentation: https://docs.microsoft.com/en-us/windows/desktop/Services/starting-a-service +func (sysd WindowsInitSystem) ServiceStart(service string) error { + m, err := mgr.Connect() + if err != nil { + return err + } + defer m.Disconnect() + + s, err := m.OpenService(service) + if err != nil { + return errors.Wrapf(err, "could not access service %s", service) + } + defer s.Close() + + // Check if service is already started + status, err := s.Query() + if err != nil { + return errors.Wrapf(err, "could not query service %s", service) + } + + if status.State != svc.Stopped && status.State != svc.StopPending { + return nil + } + + timeout := time.Now().Add(10 * time.Second) + for status.State != svc.Stopped { + if timeout.Before(time.Now()) { + return errors.Errorf("timeout waiting for %s service to stop", service) + } + time.Sleep(300 * time.Millisecond) + status, err = s.Query() + if err != nil { + return errors.Wrapf(err, "could not retrieve %s service status", service) + } + } + + // Start the service + err = s.Start("is", "manual-started") + if err != nil { + return errors.Wrapf(err, "could not start service %s", service) + } + + // Check that the start was successful + status, err = s.Query() + if err != nil { + return errors.Wrapf(err, "could not query service %s", service) + } + timeout = time.Now().Add(10 * time.Second) + for status.State != svc.Running { + if timeout.Before(time.Now()) { + return errors.Errorf("timeout waiting for %s service to start", service) + } + time.Sleep(300 * time.Millisecond) + status, err = s.Query() + if err != nil { + return errors.Wrapf(err, "could not retrieve %s service status", service) + } + } + return nil +} + // ServiceIsEnabled ensures the service is enabled to start on each boot. func (sysd WindowsInitSystem) ServiceIsEnabled(service string) bool { m, err := mgr.Connect() diff --git a/pkg/yurtadm/util/localnode/localnode.go b/pkg/yurtadm/util/localnode/localnode.go new file mode 100644 index 00000000000..cc0f982889c --- /dev/null +++ b/pkg/yurtadm/util/localnode/localnode.go @@ -0,0 +1,118 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package localnode + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + + "github.com/openyurtio/openyurt/pkg/yurtadm/constants" + "github.com/openyurtio/openyurt/pkg/yurtadm/util/initsystem" + "k8s.io/klog/v2" +) + +// DeployYurthubInSystemd deploys yurthub in systemd +func DeployYurthubInSystemd(hostControlPlaneAddr string, serverAddr string, yurthubBinary string) error { + if err := SetYurthubService(hostControlPlaneAddr, serverAddr, yurthubBinary); err != nil { + return err + } + if err := EnableYurthubService(); err != nil { + return err + } + if err := StartYurthubService(); err != nil { + return err + } + return nil +} + +// SetYurthubService configure yurthub service. +func SetYurthubService(hostControlPlaneAddr string, serverAddr string, yurthubBinary string) error { + klog.Info("Setting Yurthub service.") + yurthubServiceDir := filepath.Dir(constants.YurthubServiceFilepath) + if _, err := os.Stat(yurthubServiceDir); err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(yurthubServiceDir, os.ModePerm); err != nil { + klog.Errorf("Create dir %s fail: %v", yurthubServiceDir, err) + return err + } + } else { + klog.Errorf("Describe dir %s fail: %v", yurthubServiceDir, err) + return err + } + } + // copy yurthub binary to /usr/bin + cmd := exec.Command("cp", yurthubBinary, "/usr/bin") + if err := cmd.Run(); err != nil { + klog.Errorf("Copy yurthub binary to /usr/bin fail: %v", err) + return err + } + klog.Info("yurthub binary is in /usr/bin.") + + // yurthub.default contains the environment variables that yurthub needs + if err := os.WriteFile(constants.YurthubEnvironmentFilePath, []byte(constants.YurthubSyetmdServiceEnvironmentFileContent), 0644); err != nil { + klog.Errorf("Write file %s fail: %v", constants.YurthubEnvironmentFilePath, err) + return err + } + + // yurthub.service contains the configuration of yurthub service + if err := os.WriteFile(constants.YurthubServiceFilepath, []byte(constants.YurthubSyetmdServiceContent), 0644); err != nil { + klog.Errorf("Write file %s fail: %v", constants.YurthubServiceFilepath, err) + return err + } + return nil +} + +// EnableYurthubService enable yurthub service +func EnableYurthubService() error { + initSystem, err := initsystem.GetInitSystem() + if err != nil { + return err + } + + if !initSystem.ServiceIsEnabled("yurthub") { + if err = initSystem.ServiceEnable("yurthub"); err != nil { + return fmt.Errorf("enable yurthub service failed") + } + } + return nil +} + +// StartYurthubService start yurthub service +func StartYurthubService() error { + initSystem, err := initsystem.GetInitSystem() + if err != nil { + return err + } + if err = initSystem.ServiceStart("yurthub"); err != nil { + return fmt.Errorf("start yurthub service failed") + } + return nil +} + +// CheckYurthubStatus check if yurthub is healthy. +func CheckYurthubStatus() error { + initSystem, err := initsystem.GetInitSystem() + if err != nil { + return err + } + if ok := initSystem.ServiceIsActive("yurthub"); !ok { + return fmt.Errorf("yurthub is not active. ") + } + return nil +} diff --git a/pkg/yurtadm/util/yurthub/yurthub_test.go b/pkg/yurtadm/util/yurthub/yurthub_test.go index 755fc456ce1..87e0d250f3d 100644 --- a/pkg/yurtadm/util/yurthub/yurthub_test.go +++ b/pkg/yurtadm/util/yurthub/yurthub_test.go @@ -255,6 +255,14 @@ func (j *testData) YurtHubImage() string { return "" } +func (j *testData) YurtHubBinary() string { + return "" +} + +func (j *testData) HostControlPlaneAddr() string { + return "" +} + func (j *testData) YurtHubServer() string { return "" }