Skip to content

Commit

Permalink
support different platforms and connectivity scenarios
Browse files Browse the repository at this point in the history
These are the combinations

Two type of platforms: containers reachable or not reachable from host
Two type of deployments: cloud-provider-kind as a container or as a
proces.

Containers Reachable and KCCM Process: Direct (Linux)
Containers Reachable and KCCM container: Direct (Linux)
Containers Not Rechable and KCCM Process: Tunnel (Mac, Windows, WSL2)
Containers Not Rechable and KCCM Container: Portmap (Mac, Windows, WSL2)

Change-Id: Ife20049c0809a73a340600644083bb542cc6f0fe
  • Loading branch information
aojea committed Aug 30, 2024
1 parent 827fdb1 commit 2edf524
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 25 deletions.
32 changes: 31 additions & 1 deletion cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import (
"fmt"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"

"k8s.io/component-base/logs"
"k8s.io/klog/v2"

"sigs.k8s.io/cloud-provider-kind/pkg/config"
"sigs.k8s.io/cloud-provider-kind/pkg/controller"
"sigs.k8s.io/kind/pkg/cluster"
kindcmd "sigs.k8s.io/kind/pkg/cmd"
)

Expand Down Expand Up @@ -98,5 +101,32 @@ func Main() {
klog.Infof("**** Dumping load balancers logs to: %s", logDumpDir)
}

controller.New(logger).Run(ctx)
// some platforms require to enable tunneling for the LoadBalancers
if runtime.GOOS == "darwin" || runtime.GOOS == "windows" || isWSL2() {
config.DefaultConfig.LoadBalancerConnectivity = config.Tunnel
}

// default control plane connectivity to portmap, it will be
// overriden if the first cluster added detects direct
// connecitivity
config.DefaultConfig.ControlPlaneConnectivity = config.Portmap

// initialize kind provider
option, err := cluster.DetectNodeProvider()
if err != nil {
klog.Fatalf("can not detect cluster provider: %v", err)
}
kindProvider := cluster.NewProvider(
option,
cluster.ProviderWithLogger(logger),
)
controller.New(kindProvider).Run(ctx)
}

func isWSL2() bool {
if v, err := os.ReadFile("/proc/version"); err == nil {
return strings.Contains(string(v), "WSL2")
}

return false
}
16 changes: 16 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,20 @@ var DefaultConfig = &Config{}
type Config struct {
EnableLogDump bool
LogDir string
// Platforms like Mac or Windows can not access the containers directly
// so we do a double hop, enable container portmapping for the LoadBalancer containter
// and do userspace proxying from the original port to the portmaps.
// If the cloud-provider-kind runs in a container on these platforms only enables portmapping.
LoadBalancerConnectivity Connectivity
// Type of connectivity between the cloud-provider-kind and the clusters
ControlPlaneConnectivity Connectivity
}

type Connectivity int

const (
Unknown Connectivity = iota
Direct
Portmap
Tunnel
)
28 changes: 18 additions & 10 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -20,14 +21,16 @@ import (
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
ccmfeatures "k8s.io/controller-manager/pkg/features"
"k8s.io/klog/v2"
cpkconfig "sigs.k8s.io/cloud-provider-kind/pkg/config"
"sigs.k8s.io/cloud-provider-kind/pkg/constants"
"sigs.k8s.io/cloud-provider-kind/pkg/container"
"sigs.k8s.io/cloud-provider-kind/pkg/loadbalancer"
"sigs.k8s.io/cloud-provider-kind/pkg/provider"
"sigs.k8s.io/kind/pkg/cluster"
"sigs.k8s.io/kind/pkg/log"
)

var once sync.Once

type Controller struct {
kind *cluster.Provider
clusters map[string]*ccm
Expand All @@ -40,12 +43,10 @@ type ccm struct {
cancelFn context.CancelFunc
}

func New(logger log.Logger) *Controller {
func New(provider *cluster.Provider) *Controller {
controllersmetrics.Register()
return &Controller{
kind: cluster.NewProvider(
cluster.ProviderWithLogger(logger),
),
kind: provider,
clusters: make(map[string]*ccm),
}
}
Expand Down Expand Up @@ -108,18 +109,17 @@ func (c *Controller) Run(ctx context.Context) {
}
}

// getKubeClient returns a kubeclient depending if the ccm runs inside a container
// inside the same docker network that the kind cluster or run externally in the host
// It tries first to connect to the external endpoint
// getKubeClient returns a kubeclient for the cluster passed as argument
// It tries first to connect to the internal endpoint.
func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kubernetes.Interface, error) {
httpClient := &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
// try internal first
for _, internal := range []bool{false, true} {
// prefer internal (direct connectivity) over no-internal (commonly portmap)
for _, internal := range []bool{true, false} {
kconfig, err := c.kind.KubeConfig(cluster, internal)
if err != nil {
klog.Errorf("Failed to get kubeconfig for cluster %s: %v", cluster, err)
Expand Down Expand Up @@ -157,6 +157,14 @@ func (c *Controller) getKubeClient(ctx context.Context, cluster string) (kuberne
klog.Errorf("Failed to create kubeClient for cluster %s: %v", cluster, err)
continue
}
// the first cluster will give us the type of connectivity between
// cloud-provider-kind and the clusters and load balancer containers.
// In Linux or containerized cloud-provider-kind this will be direct.
once.Do(func() {
if internal {
cpkconfig.DefaultConfig.ControlPlaneConnectivity = cpkconfig.Direct
}
})
return kubeClient, err
}
return nil, fmt.Errorf("can not find a working kubernetes clientset")
Expand Down
21 changes: 17 additions & 4 deletions pkg/loadbalancer/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"strconv"
"strings"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"

"sigs.k8s.io/cloud-provider-kind/pkg/config"
"sigs.k8s.io/cloud-provider-kind/pkg/container"
)

Expand Down Expand Up @@ -319,15 +321,26 @@ func waitLoadBalancerReady(ctx context.Context, name string, timeout time.Durati
if err != nil {
return err
}
port, ok := portmaps[strconv.Itoa(envoyAdminPort)]
if !ok {
return fmt.Errorf("envoy admin port %d not found, got %v", envoyAdminPort, portmaps)

var authority string
if config.DefaultConfig.ControlPlaneConnectivity == config.Direct {
ipv4, _, err := container.IPs(name)
if err != nil {
return err
}
authority = net.JoinHostPort(ipv4, strconv.Itoa(envoyAdminPort))
} else {
port, ok := portmaps[strconv.Itoa(envoyAdminPort)]
if !ok {
return fmt.Errorf("envoy admin port %d not found, got %v", envoyAdminPort, portmaps)
}
authority = net.JoinHostPort("127.0.0.1", port)
}

httpClient := http.DefaultClient
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, timeout, true, func(ctx context.Context) (done bool, err error) {
// iptables port forwarding on localhost only works for IPv4
resp, err := httpClient.Get(fmt.Sprintf("http://127.0.0.1:%s/ready", port))
resp, err := httpClient.Get(fmt.Sprintf("http://%s/ready", authority))
if err != nil {
klog.V(2).Infof("unexpected error trying to get load balancer %s readiness :%v", name, err)
return false, nil
Expand Down
12 changes: 2 additions & 10 deletions pkg/loadbalancer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"os"
"path"
"runtime"
"strings"

v1 "k8s.io/api/core/v1"
Expand All @@ -29,20 +28,13 @@ var _ cloudprovider.LoadBalancer = &Server{}

func NewServer() cloudprovider.LoadBalancer {
s := &Server{}
if runtime.GOOS == "darwin" || runtime.GOOS == "windows" || isWSL2() {

if config.DefaultConfig.LoadBalancerConnectivity == config.Tunnel {
s.tunnelManager = NewTunnelManager()
}
return s
}

func isWSL2() bool {
if v, err := os.ReadFile("/proc/version"); err == nil {
return strings.Contains(string(v), "WSL2")
}

return false
}

func (s *Server) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
// report status
name := loadBalancerName(clusterName, service)
Expand Down

0 comments on commit 2edf524

Please sign in to comment.