From 76d5a2cce6c574bea7dc2e6cbd1e741319ad6673 Mon Sep 17 00:00:00 2001 From: Tommy Xiao Date: Mon, 21 Oct 2024 22:53:49 +0800 Subject: [PATCH 1/2] fix templates and executor modules --- pkg/agent/containerd/config.go | 4 +- pkg/agent/templates/templates.go | 87 ++++++++++++++++++++++ pkg/agent/templates/templates_linux.go | 73 +++++++----------- pkg/agent/templates/templates_windows.go | 80 ++++++++------------ pkg/daemons/executor/embed.go | 95 ++++++++++++++++++++---- pkg/daemons/executor/embed_linux.go | 16 ++++ pkg/daemons/executor/embed_windows.go | 90 ++++++++++++++++++++++ pkg/daemons/executor/etcd.go | 5 +- pkg/daemons/executor/executor.go | 51 ++++++++----- pkg/util/command.go | 21 ++++++ 10 files changed, 388 insertions(+), 134 deletions(-) create mode 100644 pkg/daemons/executor/embed_linux.go create mode 100644 pkg/daemons/executor/embed_windows.go create mode 100644 pkg/util/command.go diff --git a/pkg/agent/containerd/config.go b/pkg/agent/containerd/config.go index 6cf72a84..b5a5f40b 100644 --- a/pkg/agent/containerd/config.go +++ b/pkg/agent/containerd/config.go @@ -71,7 +71,7 @@ func writeContainerdHosts(cfg *config.Node, containerdConfig templates.Container } // cleanContainerdHosts removes any registry host config dirs containing a hosts.toml file -// with a header that indicates it was created by k8e, or directories where a hosts.toml +// with a header that indicates it was created by k3s, or directories where a hosts.toml // is about to be written. Unmanaged directories not containing this file, or containing // a file without the header, are left alone. func cleanContainerdHosts(dir string, hosts HostConfigs) error { @@ -81,7 +81,7 @@ func cleanContainerdHosts(dir string, hosts HostConfigs) error { os.RemoveAll(hostsDir) } - // clean directories that contain a hosts.toml with a header indicating it was created by k8e + // clean directories that contain a hosts.toml with a header indicating it was created by k3s ents, err := os.ReadDir(dir) if err != nil && !os.IsNotExist(err) { return err diff --git a/pkg/agent/templates/templates.go b/pkg/agent/templates/templates.go index 2b2ecaf7..0d2478cc 100644 --- a/pkg/agent/templates/templates.go +++ b/pkg/agent/templates/templates.go @@ -1,9 +1,14 @@ package templates import ( + "bytes" + "net/url" + "text/template" + "github.com/rancher/wharfie/pkg/registries" "github.com/xiaods/k8e/pkg/daemons/config" + "github.com/xiaods/k8e/pkg/version" ) type ContainerdRuntimeConfig struct { @@ -17,7 +22,89 @@ type ContainerdConfig struct { SystemdCgroup bool IsRunningInUserNS bool EnableUnprivileged bool + NoDefaultEndpoint bool PrivateRegistryConfig *registries.Registry ExtraRuntimes map[string]ContainerdRuntimeConfig Program string } + +type RegistryEndpoint struct { + OverridePath bool + URL *url.URL + Rewrites map[string]string + Config registries.RegistryConfig +} + +type HostConfig struct { + Default *RegistryEndpoint + Program string + Endpoints []RegistryEndpoint +} + +var HostsTomlHeader = "# File generated by " + version.Program + ". DO NOT EDIT.\n" + +const HostsTomlTemplate = ` +{{- /* */ -}} +# File generated by {{ .Program }}. DO NOT EDIT. +{{ with $e := .Default }} +{{- if $e.URL }} +server = "{{ $e.URL }}" +capabilities = ["pull", "resolve", "push"] +{{ end }} +{{- if $e.Config.TLS }} +{{- if $e.Config.TLS.CAFile }} +ca = [{{ printf "%q" $e.Config.TLS.CAFile }}] +{{- end }} +{{- if or $e.Config.TLS.CertFile $e.Config.TLS.KeyFile }} +client = [[{{ printf "%q" $e.Config.TLS.CertFile }}, {{ printf "%q" $e.Config.TLS.KeyFile }}]] +{{- end }} +{{- if $e.Config.TLS.InsecureSkipVerify }} +skip_verify = true +{{- end }} +{{ end }} +{{ end }} +[host] +{{ range $e := .Endpoints -}} +[host."{{ $e.URL }}"] + capabilities = ["pull", "resolve"] + {{- if $e.OverridePath }} + override_path = true + {{- end }} +{{- if $e.Config.TLS }} + {{- if $e.Config.TLS.CAFile }} + ca = [{{ printf "%q" $e.Config.TLS.CAFile }}] + {{- end }} + {{- if or $e.Config.TLS.CertFile $e.Config.TLS.KeyFile }} + client = [[{{ printf "%q" $e.Config.TLS.CertFile }}, {{ printf "%q" $e.Config.TLS.KeyFile }}]] + {{- end }} + {{- if $e.Config.TLS.InsecureSkipVerify }} + skip_verify = true + {{- end }} +{{ end }} +{{- if $e.Rewrites }} + [host."{{ $e.URL }}".rewrite] + {{- range $pattern, $replace := $e.Rewrites }} + "{{ $pattern }}" = "{{ $replace }}" + {{- end }} +{{ end }} +{{ end -}} +` + +func ParseTemplateFromConfig(templateBuffer string, config interface{}) (string, error) { + out := new(bytes.Buffer) + t := template.Must(template.New("compiled_template").Funcs(templateFuncs).Parse(templateBuffer)) + template.Must(t.New("base").Parse(ContainerdConfigTemplate)) + if err := t.Execute(out, config); err != nil { + return "", err + } + return out.String(), nil +} + +func ParseHostsTemplateFromConfig(templateBuffer string, config interface{}) (string, error) { + out := new(bytes.Buffer) + t := template.Must(template.New("compiled_template").Funcs(templateFuncs).Parse(templateBuffer)) + if err := t.Execute(out, config); err != nil { + return "", err + } + return out.String(), nil +} \ No newline at end of file diff --git a/pkg/agent/templates/templates_linux.go b/pkg/agent/templates/templates_linux.go index 2a8e51df..6fb66094 100644 --- a/pkg/agent/templates/templates_linux.go +++ b/pkg/agent/templates/templates_linux.go @@ -3,13 +3,14 @@ package templates import ( - "bytes" "text/template" ) const ContainerdConfigTemplate = ` +{{- /* */ -}} # File generated by {{ .Program }}. DO NOT EDIT. Use config.toml.tmpl instead. version = 2 + [plugins."io.containerd.internal.v1.opt"] path = "{{ .NodeConfig.Containerd.Opt }}" [plugins."io.containerd.grpc.v1.cri"] @@ -18,6 +19,7 @@ version = 2 enable_selinux = {{ .NodeConfig.SELinux }} enable_unprivileged_ports = {{ .EnableUnprivileged }} enable_unprivileged_icmp = {{ .EnableUnprivileged }} + {{- if .DisableCgroup}} disable_cgroup = true {{end}} @@ -25,9 +27,11 @@ version = 2 disable_apparmor = true restrict_oom_score_adj = true {{end}} + {{- if .NodeConfig.AgentConfig.PauseImage }} sandbox_image = "{{ .NodeConfig.AgentConfig.PauseImage }}" {{end}} + {{- if .NodeConfig.AgentConfig.Snapshotter }} [plugins."io.containerd.grpc.v1.cri".containerd] snapshotter = "{{ .NodeConfig.AgentConfig.Snapshotter }}" @@ -40,19 +44,11 @@ cri_keychain_image_service_path = "{{ .NodeConfig.AgentConfig.ImageServiceSocket [plugins."io.containerd.snapshotter.v1.stargz".cri_keychain] enable_keychain = true {{end}} + +[plugins."io.containerd.snapshotter.v1.stargz".registry] + config_path = "{{ .NodeConfig.Containerd.Registry }}" + {{ if .PrivateRegistryConfig }} -{{ if .PrivateRegistryConfig.Mirrors }} -[plugins."io.containerd.snapshotter.v1.stargz".registry.mirrors]{{end}} -{{range $k, $v := .PrivateRegistryConfig.Mirrors }} -[plugins."io.containerd.snapshotter.v1.stargz".registry.mirrors."{{$k}}"] - endpoint = [{{range $i, $j := $v.Endpoints}}{{if $i}}, {{end}}{{printf "%q" .}}{{end}}] -{{if $v.Rewrites}} - [plugins."io.containerd.snapshotter.v1.stargz".registry.mirrors."{{$k}}".rewrite] -{{range $pattern, $replace := $v.Rewrites}} - "{{$pattern}}" = "{{$replace}}" -{{end}} -{{end}} -{{end}} {{range $k, $v := .PrivateRegistryConfig.Configs }} {{ if $v.Auth }} [plugins."io.containerd.snapshotter.v1.stargz".registry.configs."{{$k}}".auth] @@ -61,16 +57,15 @@ enable_keychain = true {{ if $v.Auth.Auth }}auth = {{ printf "%q" $v.Auth.Auth }}{{end}} {{ if $v.Auth.IdentityToken }}identitytoken = {{ printf "%q" $v.Auth.IdentityToken }}{{end}} {{end}} -{{ if $v.TLS }} -[plugins."io.containerd.snapshotter.v1.stargz".registry.configs."{{$k}}".tls] - {{ if $v.TLS.CAFile }}ca_file = "{{ $v.TLS.CAFile }}"{{end}} - {{ if $v.TLS.CertFile }}cert_file = "{{ $v.TLS.CertFile }}"{{end}} - {{ if $v.TLS.KeyFile }}key_file = "{{ $v.TLS.KeyFile }}"{{end}} - {{ if $v.TLS.InsecureSkipVerify }}insecure_skip_verify = true{{end}} {{end}} {{end}} {{end}} {{end}} + +{{- if not .NodeConfig.NoFlannel }} +[plugins."io.containerd.grpc.v1.cri".cni] + bin_dir = "{{ .NodeConfig.AgentConfig.CNIBinDir }}" + conf_dir = "{{ .NodeConfig.AgentConfig.CNIConfDir }}" {{end}} {{- if or .NodeConfig.Containerd.BlockIOConfig .NodeConfig.Containerd.RDTConfig }} @@ -81,21 +76,14 @@ enable_keychain = true [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc] runtime_type = "io.containerd.runc.v2" + [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc.options] SystemdCgroup = {{ .SystemdCgroup }} + +[plugins."io.containerd.grpc.v1.cri".registry] + config_path = "{{ .NodeConfig.Containerd.Registry }}" + {{ if .PrivateRegistryConfig }} -{{ if .PrivateRegistryConfig.Mirrors }} -[plugins."io.containerd.grpc.v1.cri".registry.mirrors]{{end}} -{{range $k, $v := .PrivateRegistryConfig.Mirrors }} -[plugins."io.containerd.grpc.v1.cri".registry.mirrors."{{$k}}"] - endpoint = [{{range $i, $j := $v.Endpoints}}{{if $i}}, {{end}}{{printf "%q" .}}{{end}}] -{{if $v.Rewrites}} - [plugins."io.containerd.grpc.v1.cri".registry.mirrors."{{$k}}".rewrite] -{{range $pattern, $replace := $v.Rewrites}} - "{{$pattern}}" = "{{$replace}}" -{{end}} -{{end}} -{{end}} {{range $k, $v := .PrivateRegistryConfig.Configs }} {{ if $v.Auth }} [plugins."io.containerd.grpc.v1.cri".registry.configs."{{$k}}".auth] @@ -104,15 +92,9 @@ enable_keychain = true {{ if $v.Auth.Auth }}auth = {{ printf "%q" $v.Auth.Auth }}{{end}} {{ if $v.Auth.IdentityToken }}identitytoken = {{ printf "%q" $v.Auth.IdentityToken }}{{end}} {{end}} -{{ if $v.TLS }} -[plugins."io.containerd.grpc.v1.cri".registry.configs."{{$k}}".tls] - {{ if $v.TLS.CAFile }}ca_file = "{{ $v.TLS.CAFile }}"{{end}} - {{ if $v.TLS.CertFile }}cert_file = "{{ $v.TLS.CertFile }}"{{end}} - {{ if $v.TLS.KeyFile }}key_file = "{{ $v.TLS.KeyFile }}"{{end}} - {{ if $v.TLS.InsecureSkipVerify }}insecure_skip_verify = true{{end}} -{{end}} {{end}} {{end}} + {{range $k, $v := .ExtraRuntimes}} [plugins."io.containerd.grpc.v1.cri".containerd.runtimes."{{$k}}"] runtime_type = "{{$v.RuntimeType}}" @@ -122,12 +104,9 @@ enable_keychain = true {{end}} ` -func ParseTemplateFromConfig(templateBuffer string, config interface{}) (string, error) { - out := new(bytes.Buffer) - t := template.Must(template.New("compiled_template").Parse(templateBuffer)) - template.Must(t.New("base").Parse(ContainerdConfigTemplate)) - if err := t.Execute(out, config); err != nil { - return "", err - } - return out.String(), nil -} +// Linux config templates do not need fixups +var templateFuncs = template.FuncMap{ + "deschemify": func(s string) string { + return s + }, +} \ No newline at end of file diff --git a/pkg/agent/templates/templates_windows.go b/pkg/agent/templates/templates_windows.go index 799afd47..e3507dc3 100644 --- a/pkg/agent/templates/templates_windows.go +++ b/pkg/agent/templates/templates_windows.go @@ -4,20 +4,22 @@ package templates import ( - "bytes" "net/url" "strings" "text/template" ) const ContainerdConfigTemplate = ` +{{- /* */ -}} +# File generated by {{ .Program }}. DO NOT EDIT. Use config.toml.tmpl instead. version = 2 -root = "{{ replace .NodeConfig.Containerd.Root }}" -state = "{{ replace .NodeConfig.Containerd.State }}" +root = {{ printf "%q" .NodeConfig.Containerd.Root }} +state = {{ printf "%q" .NodeConfig.Containerd.State }} plugin_dir = "" disabled_plugins = [] required_plugins = [] oom_score = 0 + [grpc] address = "{{ deschemify .NodeConfig.Containerd.Address }}" tcp_address = "" @@ -27,25 +29,31 @@ oom_score = 0 gid = 0 max_recv_message_size = 16777216 max_send_message_size = 16777216 + [ttrpc] address = "" uid = 0 gid = 0 + [debug] address = "" uid = 0 gid = 0 level = "" + [metrics] address = "" grpc_histogram = false + [cgroup] path = "" + [timeouts] "io.containerd.timeout.shim.cleanup" = "5s" "io.containerd.timeout.shim.load" = "5s" "io.containerd.timeout.shim.shutdown" = "3s" "io.containerd.timeout.task.state" = "2s" + [plugins] [plugins."io.containerd.gc.v1.scheduler"] pause_threshold = 0.02 @@ -100,14 +108,15 @@ oom_score = 0 privileged_without_host_devices = false base_runtime_spec = "" [plugins."io.containerd.grpc.v1.cri".cni] - bin_dir = "{{ replace .NodeConfig.AgentConfig.CNIBinDir }}" - conf_dir = "{{ replace .NodeConfig.AgentConfig.CNIConfDir }}" + bin_dir = {{ printf "%q" .NodeConfig.AgentConfig.CNIBinDir }} + conf_dir = {{ printf "%q" .NodeConfig.AgentConfig.CNIConfDir }} max_conf_num = 1 conf_template = "" [plugins."io.containerd.grpc.v1.cri".registry] - config_path = "" + config_path = {{ printf "%q" .NodeConfig.Containerd.Registry }} + + {{ if .PrivateRegistryConfig }} {{range $k, $v := .PrivateRegistryConfig.Configs }} - [plugins."io.containerd.grpc.v1.cri".registry.auths] {{ if $v.Auth }} [plugins."io.containerd.grpc.v1.cri".registry.configs.auth."{{$k}}"] {{ if $v.Auth.Username }}username = {{ printf "%q" $v.Auth.Username }}{{end}} @@ -115,35 +124,15 @@ oom_score = 0 {{ if $v.Auth.Auth }}auth = {{ printf "%q" $v.Auth.Auth }}{{end}} {{ if $v.Auth.IdentityToken }}identitytoken = {{ printf "%q" $v.Auth.IdentityToken }}{{end}} {{end}} - [plugins."io.containerd.grpc.v1.cri".registry.configs] - {{ if $v.TLS }} - [plugins."io.containerd.grpc.v1.cri".registry.configs.tls."{{$k}}"] - {{ if $v.TLS.CAFile }}ca_file = "{{ $v.TLS.CAFile }}"{{end}} - {{ if $v.TLS.CertFile }}cert_file = "{{ $v.TLS.CertFile }}"{{end}} - {{ if $v.TLS.KeyFile }}key_file = "{{ $v.TLS.KeyFile }}"{{end}} - {{ if $v.TLS.InsecureSkipVerify }}insecure_skip_verify = true{{end}} - {{end}} {{end}} - [plugins."io.containerd.grpc.v1.cri".registry.mirrors] - {{ if .PrivateRegistryConfig.Mirrors }} - {{range $k, $v := .PrivateRegistryConfig.Mirrors }} - [plugins."io.containerd.grpc.v1.cri".registry.mirrors."{{$k}}"] - endpoint = [{{range $i, $j := $v.Endpoints}}{{if $i}}, {{end}}{{printf "%q" .}}{{end}}] - {{if $v.Rewrites}} - [plugins."io.containerd.grpc.v1.cri".registry.mirrors."{{$k}}".rewrite] - {{range $pattern, $replace := $v.Rewrites}} - "{{$pattern}}" = "{{$replace}}" - {{end}} - {{end}} {{end}} - {{end}} [plugins."io.containerd.grpc.v1.cri".image_decryption] key_model = "" [plugins."io.containerd.grpc.v1.cri".x509_key_pair_streaming] tls_cert_file = "" tls_key_file = "" [plugins."io.containerd.internal.v1.opt"] - path = "{{ replace .NodeConfig.Containerd.Opt }}" + path = {{ printf "%q" .NodeConfig.Containerd.Opt }} [plugins."io.containerd.internal.v1.restart"] interval = "10s" [plugins."io.containerd.metadata.v1.bolt"] @@ -154,27 +143,16 @@ oom_score = 0 default = ["windows", "windows-lcow"] ` -func ParseTemplateFromConfig(templateBuffer string, config interface{}) (string, error) { - out := new(bytes.Buffer) - funcs := template.FuncMap{ - "replace": func(s string) string { - return strings.ReplaceAll(s, "\\", "\\\\") - }, - "deschemify": func(s string) string { - if strings.HasPrefix(s, "npipe:") { - u, err := url.Parse(s) - if err != nil { - return "" - } - return u.Path +// Windows config templates need named pipe addresses fixed up +var templateFuncs = template.FuncMap{ + "deschemify": func(s string) string { + if strings.HasPrefix(s, "npipe:") { + u, err := url.Parse(s) + if err != nil { + return "" } - return s - }, - } - t := template.Must(template.New("compiled_template").Funcs(funcs).Parse(templateBuffer)) - template.Must(t.New("base").Parse(ContainerdConfigTemplate)) - if err := t.Execute(out, config); err != nil { - return "", err - } - return out.String(), nil -} + return u.Path + } + return s + }, +} \ No newline at end of file diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index 25783015..51ee80d2 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -7,17 +7,20 @@ import ( "context" "flag" "net/http" + "os" "runtime" "runtime/debug" "strconv" "time" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" + "github.com/xiaods/k8e/pkg/agent/containerd" + "github.com/xiaods/k8e/pkg/agent/cridockerd" "github.com/xiaods/k8e/pkg/cli/cmds" daemonconfig "github.com/xiaods/k8e/pkg/daemons/config" "github.com/xiaods/k8e/pkg/util" "github.com/xiaods/k8e/pkg/version" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" @@ -27,16 +30,17 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" toolswatch "k8s.io/client-go/tools/watch" - ccm "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider" cloudproviderapi "k8s.io/cloud-provider/api" ccmapp "k8s.io/cloud-provider/app" cloudcontrollerconfig "k8s.io/cloud-provider/app/config" + "k8s.io/cloud-provider/names" ccmopt "k8s.io/cloud-provider/options" cliflag "k8s.io/component-base/cli/flag" "k8s.io/klog/v2" - "k8s.io/kubernetes/cmd/kube-apiserver/app" + apiapp "k8s.io/kubernetes/cmd/kube-apiserver/app" cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app" + proxy "k8s.io/kubernetes/cmd/kube-proxy/app" sapp "k8s.io/kubernetes/cmd/kube-scheduler/app" kubelet "k8s.io/kubernetes/cmd/kubelet/app" @@ -89,19 +93,45 @@ func (e *Embedded) Kubelet(ctx context.Context, args []string) error { if err := util.WaitForAPIServerReady(ctx, e.nodeConfig.AgentConfig.KubeConfigKubelet, util.DefaultAPIServerReadyTimeout); err != nil { logrus.Fatalf("Kubelet failed to wait for apiserver ready: %v", err) } - logrus.Fatalf("kubelet exited: %v", command.ExecuteContext(ctx)) + err := command.ExecuteContext(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + logrus.Errorf("kubelet exited: %v", err) + os.Exit(1) + } + os.Exit(0) + }() + + return nil +} + +func (e *Embedded) KubeProxy(ctx context.Context, args []string) error { + command := proxy.NewProxyCommand() + command.SetArgs(daemonconfig.GetArgs(platformKubeProxyArgs(e.nodeConfig), args)) + + go func() { + defer func() { + if err := recover(); err != nil { + logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err) + } + }() + err := command.ExecuteContext(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + logrus.Errorf("kube-proxy exited: %v", err) + os.Exit(1) + } + os.Exit(0) }() return nil } func (*Embedded) APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) { - startupConfig := <-app.StartupConfig + startupConfig := <-apiapp.StartupConfig return startupConfig.Authenticator, startupConfig.Handler, nil } func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error { - command := app.NewAPIServerCommand(ctx.Done()) + command := apiapp.NewAPIServerCommand(ctx.Done()) command.SetArgs(args) go func() { @@ -111,7 +141,12 @@ func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args logrus.WithField("stack", string(debug.Stack())).Fatalf("apiserver panic: %v", err) } }() - logrus.Fatalf("apiserver exited: %v", command.ExecuteContext(ctx)) + err := command.ExecuteContext(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + logrus.Errorf("apiserver exited: %v", err) + os.Exit(1) + } + os.Exit(0) }() return nil @@ -140,7 +175,12 @@ func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args logrus.WithField("stack", string(debug.Stack())).Fatalf("scheduler panic: %v", err) } }() - logrus.Fatalf("scheduler exited: %v", command.ExecuteContext(ctx)) + err := command.ExecuteContext(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + logrus.Errorf("scheduler exited: %v", err) + os.Exit(1) + } + os.Exit(0) }() return nil @@ -157,7 +197,12 @@ func (*Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{} logrus.WithField("stack", string(debug.Stack())).Fatalf("controller-manager panic: %v", err) } }() - logrus.Fatalf("controller-manager exited: %v", command.ExecuteContext(ctx)) + err := command.ExecuteContext(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + logrus.Errorf("controller-manager exited: %v", err) + os.Exit(1) + } + os.Exit(0) }() return nil @@ -170,18 +215,25 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan } cloudInitializer := func(config *cloudcontrollerconfig.CompletedConfig) cloudprovider.Interface { - cloud, err := ccm.InitCloudProvider(version.Program, config.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile) + cloud, err := cloudprovider.InitCloudProvider(version.Program, config.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile) if err != nil { logrus.Fatalf("Cloud provider could not be initialized: %v", err) } if cloud == nil { logrus.Fatalf("Cloud provider is nil") } - return cloud } - command := ccmapp.NewCloudControllerManagerCommand(ccmOptions, cloudInitializer, ccmapp.DefaultInitFuncConstructors, cliflag.NamedFlagSets{}, ctx.Done()) + controllerAliases := names.CCMControllerAliases() + + command := ccmapp.NewCloudControllerManagerCommand( + ccmOptions, + cloudInitializer, + ccmapp.DefaultInitFuncConstructors, + controllerAliases, + cliflag.NamedFlagSets{}, + ctx.Done()) command.SetArgs(args) go func() { @@ -191,7 +243,12 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan logrus.WithField("stack", string(debug.Stack())).Fatalf("cloud-controller-manager panic: %v", err) } }() - logrus.Errorf("cloud-controller-manager exited: %v", command.ExecuteContext(ctx)) + err := command.ExecuteContext(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + logrus.Errorf("cloud-controller-manager exited: %v", err) + os.Exit(1) + } + os.Exit(0) }() return nil @@ -201,6 +258,14 @@ func (e *Embedded) CurrentETCDOptions() (InitialOptions, error) { return InitialOptions{}, nil } +func (e *Embedded) Containerd(ctx context.Context, cfg *daemonconfig.Node) error { + return containerd.Run(ctx, cfg) +} + +func (e *Embedded) Docker(ctx context.Context, cfg *daemonconfig.Node) error { + return cridockerd.Run(ctx, cfg) +} + // waitForUntaintedNode watches nodes, waiting to find one not tainted as // uninitialized by the external cloud provider. func waitForUntaintedNode(ctx context.Context, kubeConfig string) error { @@ -245,4 +310,4 @@ func getCloudTaint(taints []v1.Taint) *v1.Taint { } } return nil -} +} \ No newline at end of file diff --git a/pkg/daemons/executor/embed_linux.go b/pkg/daemons/executor/embed_linux.go new file mode 100644 index 00000000..f4523be9 --- /dev/null +++ b/pkg/daemons/executor/embed_linux.go @@ -0,0 +1,16 @@ +//go:build linux && !no_embedded_executor +// +build linux,!no_embedded_executor + +package executor + +import ( + daemonconfig "github.com/xiaods/k8e/pkg/daemons/config" + + // registering k8e cloud provider + _ "github.com/xiaods/k8e/pkg/cloudprovider" +) + +func platformKubeProxyArgs(nodeConfig *daemonconfig.Node) map[string]string { + argsMap := map[string]string{} + return argsMap +} \ No newline at end of file diff --git a/pkg/daemons/executor/embed_windows.go b/pkg/daemons/executor/embed_windows.go new file mode 100644 index 00000000..02ae0ff6 --- /dev/null +++ b/pkg/daemons/executor/embed_windows.go @@ -0,0 +1,90 @@ +//go:build windows && !no_embedded_executor +// +build windows,!no_embedded_executor + +package executor + +import ( + "encoding/json" + "os" + "os/exec" + "strings" + "time" + + "github.com/Microsoft/hcsshim" + "github.com/sirupsen/logrus" + + // registering k8e cloud provider + _ "github.com/xiaods/k8e/pkg/cloudprovider" + daemonconfig "github.com/xiaods/k8e/pkg/daemons/config" +) + +const ( + networkName = "flannel.4096" +) + +type SourceVipResponse struct { + IP4 struct { + IP string `json:"ip"` + } `json:"ip4"` +} + +func platformKubeProxyArgs(nodeConfig *daemonconfig.Node) map[string]string { + argsMap := map[string]string{} + argsMap["network-name"] = networkName + if sourceVip := waitForSourceVip(networkName, nodeConfig); sourceVip != "" { + argsMap["source-vip"] = sourceVip + } + return argsMap +} + +func waitForSourceVip(networkName string, nodeConfig *daemonconfig.Node) string { + for range time.Tick(time.Second * 5) { + network, err := hcsshim.GetHNSNetworkByName(networkName) + if err != nil { + logrus.WithError(err).Warningf("can't find HNS network, retrying %s", networkName) + continue + } + if network.ManagementIP == "" { + logrus.WithError(err).Warningf("wait for management IP, retrying %s", networkName) + continue + } + + subnet := network.Subnets[0].AddressPrefix + + configData := `{ + "cniVersion": "0.2.0", + "name": "vxlan0", + "ipam": { + "type": "host-local", + "ranges": [[{"subnet":"` + subnet + `"}]], + "dataDir": "/var/lib/cni/networks" + } + }` + + cmd := exec.Command("host-local.exe") + cmd.Env = append(os.Environ(), + "CNI_COMMAND=ADD", + "CNI_CONTAINERID=dummy", + "CNI_NETNS=dummy", + "CNI_IFNAME=dummy", + "CNI_PATH="+nodeConfig.AgentConfig.CNIBinDir, + ) + + cmd.Stdin = strings.NewReader(configData) + out, err := cmd.Output() + if err != nil { + logrus.WithError(err).Warning("Failed to execute host-local.exe") + continue + } + + var sourceVipResp SourceVipResponse + err = json.Unmarshal(out, &sourceVipResp) + if err != nil { + logrus.WithError(err).Warning("Failed to unmarshal sourceVip response") + continue + } + + return strings.TrimSpace(strings.Split(sourceVipResp.IP4.IP, "/")[0]) + } + return "" +} \ No newline at end of file diff --git a/pkg/daemons/executor/etcd.go b/pkg/daemons/executor/etcd.go index bf276dad..f7da0b74 100644 --- a/pkg/daemons/executor/etcd.go +++ b/pkg/daemons/executor/etcd.go @@ -6,9 +6,9 @@ import ( "os" "path/filepath" - "github.com/sirupsen/logrus" daemonconfig "github.com/xiaods/k8e/pkg/daemons/config" "github.com/xiaods/k8e/pkg/version" + "github.com/sirupsen/logrus" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" ) @@ -44,6 +44,7 @@ func (e *Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string logrus.Infof("This node has been removed from the cluster - please restart %s to rejoin the cluster", version.Program) return } + logrus.Errorf("etcd error: %v", err) case <-ctx.Done(): logrus.Infof("stopping etcd") etcd.Close() @@ -54,4 +55,4 @@ func (e *Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string } }() return nil -} +} \ No newline at end of file diff --git a/pkg/daemons/executor/executor.go b/pkg/daemons/executor/executor.go index 33c83656..a3deb0a6 100644 --- a/pkg/daemons/executor/executor.go +++ b/pkg/daemons/executor/executor.go @@ -23,6 +23,7 @@ var ( type Executor interface { Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error Kubelet(ctx context.Context, args []string) error + KubeProxy(ctx context.Context, args []string) error APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error @@ -30,26 +31,30 @@ type Executor interface { CurrentETCDOptions() (InitialOptions, error) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error + Containerd(ctx context.Context, node *daemonconfig.Node) error + Docker(ctx context.Context, node *daemonconfig.Node) error } type ETCDConfig struct { - InitialOptions `json:",inline"` - Name string `json:"name,omitempty"` - ListenClientURLs string `json:"listen-client-urls,omitempty"` - ListenClientHTTPURLs string `json:"listen-client-http-urls,omitempty"` - ListenMetricsURLs string `json:"listen-metrics-urls,omitempty"` - ListenPeerURLs string `json:"listen-peer-urls,omitempty"` - AdvertiseClientURLs string `json:"advertise-client-urls,omitempty"` - DataDir string `json:"data-dir,omitempty"` - SnapshotCount int `json:"snapshot-count,omitempty"` - ServerTrust ServerTrust `json:"client-transport-security"` - PeerTrust PeerTrust `json:"peer-transport-security"` - ForceNewCluster bool `json:"force-new-cluster,omitempty"` - HeartbeatInterval int `json:"heartbeat-interval"` - ElectionTimeout int `json:"election-timeout"` - Logger string `json:"logger"` - LogOutputs []string `json:"log-outputs"` - ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` + InitialOptions `json:",inline"` + Name string `json:"name,omitempty"` + ListenClientURLs string `json:"listen-client-urls,omitempty"` + ListenClientHTTPURLs string `json:"listen-client-http-urls,omitempty"` + ListenMetricsURLs string `json:"listen-metrics-urls,omitempty"` + ListenPeerURLs string `json:"listen-peer-urls,omitempty"` + AdvertiseClientURLs string `json:"advertise-client-urls,omitempty"` + DataDir string `json:"data-dir,omitempty"` + SnapshotCount int `json:"snapshot-count,omitempty"` + ServerTrust ServerTrust `json:"client-transport-security"` + PeerTrust PeerTrust `json:"peer-transport-security"` + ForceNewCluster bool `json:"force-new-cluster,omitempty"` + HeartbeatInterval int `json:"heartbeat-interval"` + ElectionTimeout int `json:"election-timeout"` + Logger string `json:"logger"` + LogOutputs []string `json:"log-outputs"` + + ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` + ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"` } type ServerTrust struct { @@ -137,6 +142,10 @@ func Kubelet(ctx context.Context, args []string) error { return executor.Kubelet(ctx, args) } +func KubeProxy(ctx context.Context, args []string) error { + return executor.KubeProxy(ctx, args) +} + func APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) { return executor.APIServerHandlers(ctx) } @@ -164,3 +173,11 @@ func ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error { func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error { return executor.CloudControllerManager(ctx, ccmRBACReady, args) } + +func Containerd(ctx context.Context, config *daemonconfig.Node) error { + return executor.Containerd(ctx, config) +} + +func Docker(ctx context.Context, config *daemonconfig.Node) error { + return executor.Docker(ctx, config) +} \ No newline at end of file diff --git a/pkg/util/command.go b/pkg/util/command.go new file mode 100644 index 00000000..a70b6ece --- /dev/null +++ b/pkg/util/command.go @@ -0,0 +1,21 @@ +package util + +import ( + "bytes" + "os/exec" +) + +// ExecCommand executes a command using the VPN binary +// In case of error != nil, the string returned var will have more information +func ExecCommand(command string, args []string) (string, error) { + var out, errOut bytes.Buffer + + cmd := exec.Command(command, args...) + cmd.Stdout = &out + cmd.Stderr = &errOut + err := cmd.Run() + if err != nil { + return errOut.String(), err + } + return out.String(), nil +} \ No newline at end of file From b1b8eef2434ab4480bcfd1ce96becbb42b8ddad8 Mon Sep 17 00:00:00 2001 From: Tommy Xiao Date: Mon, 21 Oct 2024 23:05:52 +0800 Subject: [PATCH 2/2] update some files --- go.mod | 2 +- pkg/agent/run.go | 5 -- pkg/daemons/executor/embed.go | 22 ------ pkg/node/controller.go | 120 +++++++++++++++++++++-------- pkg/util/services/services_test.go | 12 --- 5 files changed, 89 insertions(+), 72 deletions(-) diff --git a/go.mod b/go.mod index 2d8b0c21..6ab93322 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ replace ( ) require ( + github.com/Microsoft/hcsshim v0.12.6 github.com/Mirantis/cri-dockerd v0.0.0-00010101000000-000000000000 github.com/blang/semver/v4 v4.0.0 github.com/containerd/aufs v1.0.0 @@ -182,7 +183,6 @@ require ( github.com/JeffAshton/win_pdh v0.0.0-20161109143554-76bb4ee9f0ab // indirect github.com/MakeNowJust/heredoc v1.0.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/Microsoft/hcsshim v0.12.6 // indirect github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/Rican7/retry v0.1.0 // indirect github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e // indirect diff --git a/pkg/agent/run.go b/pkg/agent/run.go index eff857ce..98711cf1 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -90,11 +90,6 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { return fmt.Errorf("dual-stack or IPv6 are not supported on Windows node") } - conntrackConfig, err := getConntrackConfig(nodeConfig) - if err != nil { - return errors.Wrap(err, "failed to validate kube-proxy conntrack configuration") - } - syssetup.Configure(enableIPv6, conntrackConfig) nodeConfig.AgentConfig.EnableIPv4 = enableIPv4 nodeConfig.AgentConfig.EnableIPv6 = enableIPv6 diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index 51ee80d2..d54582ae 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -40,7 +40,6 @@ import ( "k8s.io/klog/v2" apiapp "k8s.io/kubernetes/cmd/kube-apiserver/app" cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app" - proxy "k8s.io/kubernetes/cmd/kube-proxy/app" sapp "k8s.io/kubernetes/cmd/kube-scheduler/app" kubelet "k8s.io/kubernetes/cmd/kubelet/app" @@ -104,27 +103,6 @@ func (e *Embedded) Kubelet(ctx context.Context, args []string) error { return nil } -func (e *Embedded) KubeProxy(ctx context.Context, args []string) error { - command := proxy.NewProxyCommand() - command.SetArgs(daemonconfig.GetArgs(platformKubeProxyArgs(e.nodeConfig), args)) - - go func() { - defer func() { - if err := recover(); err != nil { - logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err) - } - }() - err := command.ExecuteContext(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - logrus.Errorf("kube-proxy exited: %v", err) - os.Exit(1) - } - os.Exit(0) - }() - - return nil -} - func (*Embedded) APIServerHandlers(ctx context.Context) (authenticator.Request, http.Handler, error) { startupConfig := <-apiapp.StartupConfig return startupConfig.Authenticator, startupConfig.Handler, nil diff --git a/pkg/node/controller.go b/pkg/node/controller.go index 70ab6013..5736aae7 100644 --- a/pkg/node/controller.go +++ b/pkg/node/controller.go @@ -1,14 +1,19 @@ package node import ( + "bytes" "context" + "net" + "sort" "strings" + "github.com/xiaods/k8e/pkg/nodepassword" "github.com/pkg/errors" - coreclient "github.com/rancher/wrangler/pkg/generated/controllers/core/v1" + coreclient "github.com/rancher/wrangler/v3/pkg/generated/controllers/core/v1" "github.com/sirupsen/logrus" - "github.com/xiaods/k8e/pkg/nodepassword" core "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func Register(ctx context.Context, @@ -47,14 +52,22 @@ func (h *handler) onRemove(key string, node *core.Node) (*core.Node, error) { func (h *handler) updateHosts(node *core.Node, removed bool) (*core.Node, error) { var ( - nodeName string - nodeAddress string + nodeName string + hostName string + nodeIPv4 string + nodeIPv6 string ) nodeName = node.Name for _, address := range node.Status.Addresses { - if address.Type == "InternalIP" { - nodeAddress = address.Address - break + switch address.Type { + case v1.NodeInternalIP: + if strings.Contains(address.Address, ":") { + nodeIPv6 = address.Address + } else { + nodeIPv4 = address.Address + } + case v1.NodeHostName: + hostName = address.Address } } if removed { @@ -63,58 +76,101 @@ func (h *handler) updateHosts(node *core.Node, removed bool) (*core.Node, error) } } if h.modCoreDNS { - if err := h.updateCoreDNSConfigMap(nodeName, nodeAddress, removed); err != nil { + if err := h.updateCoreDNSConfigMap(nodeName, hostName, nodeIPv4, nodeIPv6, removed); err != nil { return nil, err } } return nil, nil } -func (h *handler) updateCoreDNSConfigMap(nodeName, nodeAddress string, removed bool) error { - if nodeAddress == "" && !removed { - logrus.Errorf("No InternalIP found for node " + nodeName) +func (h *handler) updateCoreDNSConfigMap(nodeName, hostName, nodeIPv4, nodeIPv6 string, removed bool) error { + if removed { + nodeIPv4 = "" + nodeIPv6 = "" + } else if nodeIPv4 == "" && nodeIPv6 == "" { + logrus.Errorf("No InternalIP addresses found for node " + nodeName) return nil } - configMapCache, err := h.configMaps.Cache().Get("kube-system", "coredns") - if err != nil || configMapCache == nil { + nodeNames := nodeName + if hostName != nodeName { + nodeNames += " " + hostName + } + + configMap, err := h.configMaps.Get("kube-system", "coredns", metav1.GetOptions{}) + if err != nil || configMap == nil { logrus.Warn(errors.Wrap(err, "Unable to fetch coredns config map")) return nil } - configMap := configMapCache.DeepCopy() - hosts := configMap.Data["NodeHosts"] - hostsMap := map[string]string{} + addressMap := map[string]string{} - for _, line := range strings.Split(hosts, "\n") { + // extract current entries from hosts file, skipping any entries that are + // empty, unparsable, or hold an incorrect address for the current node. + for _, line := range strings.Split(configMap.Data["NodeHosts"], "\n") { + line, _, _ = strings.Cut(line, "#") if line == "" { continue } fields := strings.Fields(line) - if len(fields) != 2 { + if len(fields) < 2 { logrus.Warnf("Unknown format for hosts line [%s]", line) continue } ip := fields[0] - host := fields[1] - if host == nodeName { - if removed { - continue - } - if ip == nodeAddress { - return nil + if fields[1] == nodeName { + if strings.Contains(ip, ":") { + if ip != nodeIPv6 { + continue + } + } else { + if ip != nodeIPv4 { + continue + } } } - hostsMap[host] = ip + names := strings.Join(fields[1:], " ") + addressMap[ip] = names } - if !removed { - hostsMap[nodeName] = nodeAddress + // determine what names we should have for each address family + var namesv6, namesv4 string + if nodeIPv4 != "" { + namesv4 = nodeNames + } + if nodeIPv6 != "" { + namesv6 = nodeNames } + // don't need to do anything if the addresses are in sync + if !removed && addressMap[nodeIPv4] == namesv4 && addressMap[nodeIPv6] == namesv6 { + return nil + } + + // Something's out of sync, set the desired entries + if nodeIPv4 != "" { + addressMap[nodeIPv4] = namesv4 + } + if nodeIPv6 != "" { + addressMap[nodeIPv6] = namesv6 + } + + // sort addresses by IP + addresses := make([]string, 0, len(addressMap)) + for ip := range addressMap { + addresses = append(addresses, ip) + } + sort.Slice(addresses, func(i, j int) bool { + return bytes.Compare(net.ParseIP(addresses[i]), net.ParseIP(addresses[j])) < 0 + }) + var newHosts string - for host, ip := range hostsMap { - newHosts += ip + " " + host + "\n" + for _, ip := range addresses { + newHosts += ip + " " + addressMap[ip] + "\n" + } + + if configMap.Data == nil { + configMap.Data = map[string]string{} } configMap.Data["NodeHosts"] = newHosts @@ -128,10 +184,10 @@ func (h *handler) updateCoreDNSConfigMap(nodeName, nodeAddress string, removed b } else { actionType = "Updated" } - logrus.Infof("%s coredns node hosts entry [%s]", actionType, nodeAddress+" "+nodeName) + logrus.Infof("%s coredns NodeHosts entry for %s", actionType, nodeName) return nil } func (h *handler) removeNodePassword(nodeName string) error { return nodepassword.Delete(h.secrets, nodeName) -} +} \ No newline at end of file diff --git a/pkg/util/services/services_test.go b/pkg/util/services/services_test.go index 5e32d618..0ad3319b 100644 --- a/pkg/util/services/services_test.go +++ b/pkg/util/services/services_test.go @@ -70,12 +70,6 @@ func Test_UnitFilesForServices(t *testing.T) { "/var/lib/rancher/k8e/agent/client-k8e-controller.crt", "/var/lib/rancher/k8e/agent/client-k8e-controller.key", }, - "kube-proxy": []string{ - "/var/lib/rancher/k8e/server/tls/client-kube-proxy.crt", - "/var/lib/rancher/k8e/server/tls/client-kube-proxy.key", - "/var/lib/rancher/k8e/agent/client-kube-proxy.crt", - "/var/lib/rancher/k8e/agent/client-kube-proxy.key", - }, "kubelet": []string{ "/var/lib/rancher/k8e/server/tls/client-kubelet.key", "/var/lib/rancher/k8e/server/tls/serving-kubelet.key", @@ -168,12 +162,6 @@ func Test_UnitFilesForServices(t *testing.T) { "/var/lib/rancher/k8e/agent/client-k8e-controller.crt", "/var/lib/rancher/k8e/agent/client-k8e-controller.key", }, - "kube-proxy": []string{ - "/var/lib/rancher/k8e/server/tls/client-kube-proxy.crt", - "/var/lib/rancher/k8e/server/tls/client-kube-proxy.key", - "/var/lib/rancher/k8e/agent/client-kube-proxy.crt", - "/var/lib/rancher/k8e/agent/client-kube-proxy.key", - }, "kubelet": []string{ "/var/lib/rancher/k8e/server/tls/client-kubelet.key", "/var/lib/rancher/k8e/server/tls/serving-kubelet.key",