Skip to content

Commit

Permalink
Merge pull request #514 from l1b0k/feat/config
Browse files Browse the repository at this point in the history
daemon: simplify config
  • Loading branch information
BSWANG authored Aug 9, 2023
2 parents 5e005f1 + 79983d9 commit 6096f32
Show file tree
Hide file tree
Showing 25 changed files with 361 additions and 703 deletions.
6 changes: 4 additions & 2 deletions cmd/terway-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"net"
"time"

"github.com/AliyunContainerService/terway/rpc"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/AliyunContainerService/terway/rpc"
)

const (
Expand All @@ -32,7 +34,7 @@ var (
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
// create connection and grpc client
ctx, contextCancel = context.WithTimeout(context.Background(), connTimeout)
conn, err := grpc.DialContext(ctx, defaultSocketPath, grpc.WithInsecure(), grpc.WithContextDialer(
conn, err := grpc.DialContext(ctx, defaultSocketPath, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(
func(ctx context.Context, s string) (net.Conn, error) {
unixAddr, err := net.ResolveUnixAddr("unix", defaultSocketPath)
if err != nil {
Expand Down
28 changes: 18 additions & 10 deletions cmd/terway-controlplane/terway-controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ import (
"math/rand"
"time"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics"

aliyun "github.com/AliyunContainerService/terway/pkg/aliyun/client"
"github.com/AliyunContainerService/terway/pkg/aliyun/credential"
"github.com/AliyunContainerService/terway/pkg/apis/crds"
Expand All @@ -36,15 +46,6 @@ import (
"github.com/AliyunContainerService/terway/pkg/utils"
"github.com/AliyunContainerService/terway/pkg/version"
"github.com/AliyunContainerService/terway/types/controlplane"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
Expand Down Expand Up @@ -116,7 +117,14 @@ func main() {
}
}

clientSet, err := credential.NewClientSet(string(cfg.Credential.AccessKey), string(cfg.Credential.AccessSecret), cfg.RegionID, cfg.CredentialPath, cfg.SecretNamespace, cfg.SecretName)
var providers []credential.Interface
if string(cfg.Credential.AccessKey) != "" && string(cfg.Credential.AccessSecret) != "" {
providers = append(providers, credential.NewAKPairProvider(string(cfg.Credential.AccessKey), string(cfg.Credential.AccessSecret)))
}
providers = append(providers, credential.NewEncryptedCredentialProvider(cfg.CredentialPath, cfg.SecretNamespace, cfg.SecretName))
providers = append(providers, credential.NewMetadataProvider())

clientSet, err := credential.NewClientMgr(cfg.RegionID, providers...)
if err != nil {
panic(err)
}
Expand Down
29 changes: 20 additions & 9 deletions cmd/terway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package main

import (
"flag"
"math/rand"
"os"
"time"

"github.com/AliyunContainerService/terway/daemon"
"github.com/AliyunContainerService/terway/pkg/logger"
"github.com/AliyunContainerService/terway/pkg/utils"
"github.com/AliyunContainerService/terway/pkg/version"

"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
)

var (
log = ctrl.Log.WithName("setup")
)

const defaultConfigPath = "/etc/eni/eni.json"
Expand All @@ -19,27 +27,30 @@ var (
logLevel string
daemonMode string
readonlyListen string
kubeconfig string
master string
)

func main() {
rand.New(rand.NewSource(time.Now().UnixNano()))
klog.InitFlags(nil)
defer klog.Flush()

ctrl.SetLogger(klogr.New())
log.Info(version.Version)

fs := flag.NewFlagSet("terway", flag.ExitOnError)

fs.StringVar(&daemonMode, "daemon-mode", "VPC", "terway network mode")
fs.StringVar(&logLevel, "log-level", "info", "terway log level")
fs.StringVar(&readonlyListen, "readonly-listen", utils.NormalizePath(debugSocketPath), "terway readonly listen")
fs.StringVar(&master, "master", "", "The address of the Kubernetes API server (overrides any value in kubeconfig).")
fs.StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig file with authorization and master location information.")
err := fs.Parse(os.Args[1:])
if err != nil {
panic(err)
}

klog.SetOutput(os.Stderr)
ctx := ctrl.SetupSignalHandler()
err = daemon.Run(ctx, utils.NormalizePath(defaultSocketPath), readonlyListen, utils.NormalizePath(defaultConfigPath), daemonMode, logLevel)

logger.DefaultLogger.Info(version.Version)
if err := daemon.Run(utils.NormalizePath(defaultSocketPath), readonlyListen, utils.NormalizePath(defaultConfigPath), kubeconfig, master, daemonMode, logLevel); err != nil {
logger.DefaultLogger.Fatal(err)
if err != nil {
klog.Fatal(err)
}
}
119 changes: 43 additions & 76 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/AliyunContainerService/terway/pkg/aliyun"
"github.com/AliyunContainerService/terway/pkg/aliyun/client"
"github.com/AliyunContainerService/terway/pkg/aliyun/credential"
podENITypes "github.com/AliyunContainerService/terway/pkg/apis/network.alibabacloud.com/v1beta1"
"github.com/AliyunContainerService/terway/pkg/backoff"
terwayIP "github.com/AliyunContainerService/terway/pkg/ip"
Expand All @@ -32,6 +33,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
)

const (
Expand All @@ -45,39 +47,33 @@ const (
conditionFalse = "false"
conditionTrue = "true"

networkServiceName = "default"
tracingKeyName = "name"
tracingKeyDaemonMode = "daemon_mode"
tracingKeyConfigFilePath = "config_file_path"
tracingKeyKubeConfig = "kubeconfig"
tracingKeyMaster = "master"
networkServiceName = "default"
tracingKeyName = "name"
tracingKeyDaemonMode = "daemon_mode"
tracingKeyConfigFilePath = "config_file_path"

tracingKeyPendingPodsCount = "pending_pods_count"

commandMapping = "mapping"

cniDefaultPath = "/opt/cni/bin"

IfEth0 = "eth0"
)

type networkService struct {
daemonMode string
configFilePath string
kubeConfig string
master string
k8s Kubernetes
resourceDB storage.Storage
vethResMgr ResourceManager
eniResMgr ResourceManager
eniIPResMgr ResourceManager
eipResMgr ResourceManager

k8s Kubernetes
resourceDB storage.Storage
vethResMgr ResourceManager
eniResMgr ResourceManager
eniIPResMgr ResourceManager
eipResMgr ResourceManager
//networkResourceMgr ResourceManager
mgrForResource map[string]ResourceManager
pendingPods sync.Map
sync.RWMutex

cniBinPath string

enableTrunk bool

ipFamily *types.IPFamily
Expand Down Expand Up @@ -1108,8 +1104,6 @@ func (n *networkService) Config() []tracing.MapKeyValueEntry {
{Key: tracingKeyName, Value: networkServiceName}, // use a unique name?
{Key: tracingKeyDaemonMode, Value: n.daemonMode},
{Key: tracingKeyConfigFilePath, Value: n.configFilePath},
{Key: tracingKeyKubeConfig, Value: n.kubeConfig},
{Key: tracingKeyMaster, Value: n.master},
}

return config
Expand Down Expand Up @@ -1258,18 +1252,12 @@ func toResMapping(poolStats tracing.ResourcePoolStats, pods []interface{}) ([]*t
return mapping, nil
}

func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (rpc.TerwayBackendServer, error) {
func newNetworkService(ctx context.Context, configFilePath, daemonMode string) (rpc.TerwayBackendServer, error) {
serviceLog.Debugf("start network service with: %s, %s", configFilePath, daemonMode)
cniBinPath := os.Getenv("CNI_PATH")
if cniBinPath == "" {
cniBinPath = cniDefaultPath
}

netSrv := &networkService{
configFilePath: configFilePath,
kubeConfig: kubeconfig,
master: master,
pendingPods: sync.Map{},
cniBinPath: utils.NormalizePath(cniBinPath),
}
if daemonMode == daemonModeENIMultiIP || daemonMode == daemonModeVPC || daemonMode == daemonModeENIOnly {
netSrv.daemonMode = daemonMode
Expand All @@ -1284,9 +1272,9 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
return nil, err
}

netSrv.k8s, err = newK8S(master, kubeconfig, daemonMode, globalConfig)
netSrv.k8s, err = newK8S(daemonMode, globalConfig)
if err != nil {
return nil, errors.Wrapf(err, "error init k8s service")
return nil, fmt.Errorf("error init k8s: %w", err)
}

// load dynamic config
Expand All @@ -1301,35 +1289,46 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
return nil, fmt.Errorf("failed parse config: %v", err)
}

backoff.OverrideBackoff(config.BackoffOverride)

if len(dynamicCfg) == 0 {
serviceLog.Infof("got config: %+v from: %+v", config, configFilePath)
} else {
serviceLog.Infof("got config: %+v from %+v, with dynamic config %+v", config, configFilePath, nodeLabel)
}

if err := validateConfig(config); err != nil {
return nil, err
}

if err := setDefault(config); err != nil {
config.Populate()
err = config.Validate()
if err != nil {
return nil, err
}

backoff.OverrideBackoff(config.BackoffOverride)
_ = netSrv.k8s.SetCustomStatefulWorkloadKinds(config.CustomStatefulWorkloadKinds)
netSrv.ipamType = config.IPAMType
netSrv.eniCapPolicy = config.ENICapPolicy

ins := aliyun.GetInstanceMeta()
ipFamily := types.NewIPFamilyFromIPStack(types.IPStack(config.IPStack))
netSrv.ipFamily = ipFamily

aliyunClient, err := client.NewAliyun(config.AccessID, config.AccessSecret, ins.RegionID, utils.NormalizePath(config.CredentialPath), "", "")
var providers []credential.Interface
if string(config.AccessID) != "" && string(config.AccessSecret) != "" {
providers = append(providers, credential.NewAKPairProvider(string(config.AccessID), string(config.AccessSecret)))
}
providers = append(providers, credential.NewEncryptedCredentialProvider(utils.NormalizePath(config.CredentialPath), "", ""))
providers = append(providers, credential.NewMetadataProvider())

clientSet, err := credential.NewClientMgr(config.RegionID, providers...)
if err != nil {
return nil, err
}

aliyunClient, err := client.New(clientSet,
flowcontrol.NewTokenBucketRateLimiter(8, 10),
flowcontrol.NewTokenBucketRateLimiter(4, 5))
if err != nil {
return nil, errors.Wrapf(err, "error create aliyun client")
}

limit, err := aliyun.GetLimit(aliyunClient, ins.InstanceType)
limit, err := aliyun.GetLimit(aliyunClient, config.InstanceType)
if err != nil {
return nil, fmt.Errorf("upable get instance limit, %w", err)
}
Expand All @@ -1339,7 +1338,7 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
serviceLog.Warnf("instance %s is not support ipv6", aliyun.GetInstanceMeta().InstanceType)
} else if daemonMode == daemonModeENIMultiIP && !limit.SupportMultiIPIPv6() {
ipFamily.IPv6 = false
serviceLog.Warnf("instance %s is not support ipv6", aliyun.GetInstanceMeta().InstanceType)
serviceLog.Warnf("instance %s is not support ipv6", config.InstanceType)
}
}

Expand All @@ -1361,8 +1360,6 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
return nil, errors.Wrapf(err, "error set k8s svcCidr")
}

_ = netSrv.k8s.SetCustomStatefulWorkloadKinds(config.CustomStatefulWorkloadKinds)

netSrv.resourceDB, err = storage.NewDiskStorage(
resDBName, utils.NormalizePath(resDBPath), json.Marshal, func(bytes []byte) (interface{}, error) {
resourceRel := &types.PodResources{}
Expand All @@ -1377,7 +1374,7 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
}

// get pool config
poolConfig, err := getPoolConfig(config, config.IPAMType)
poolConfig, err := getPoolConfig(config)
if err != nil {
return nil, errors.Wrapf(err, "error get pool config")
}
Expand Down Expand Up @@ -1476,42 +1473,12 @@ func newNetworkService(configFilePath, kubeconfig, master, daemonMode string) (r
return netSrv, nil
}

// setup default value
func setDefault(cfg *daemon.Config) error {
if cfg.EniCapRatio == 0 {
cfg.EniCapRatio = 1
}

// Default policy for vswitch selection is random.
if cfg.VSwitchSelectionPolicy == "" {
cfg.VSwitchSelectionPolicy = types.VSwitchSelectionPolicyRandom
}

if cfg.IPStack == "" {
cfg.IPStack = string(types.IPStackIPv4)
}

return nil
}

func validateConfig(cfg *daemon.Config) error {
switch cfg.IPStack {
case "", string(types.IPStackIPv4), string(types.IPStackDual):
default:
return fmt.Errorf("unsupported ipStack %s in configMap", cfg.IPStack)
}

return nil
}

func getPoolConfig(cfg *daemon.Config, ipamType types.IPAMType) (*types.PoolConfig, error) {
func getPoolConfig(cfg *daemon.Config) (*types.PoolConfig, error) {
poolConfig := &types.PoolConfig{
MaxPoolSize: cfg.MaxPoolSize,
MinPoolSize: cfg.MinPoolSize,
MaxENI: cfg.MaxENI,
MinENI: cfg.MinENI,
AccessID: cfg.AccessID,
AccessSecret: cfg.AccessSecret,
EniCapRatio: cfg.EniCapRatio,
EniCapShift: cfg.EniCapShift,
SecurityGroups: cfg.GetSecurityGroups(),
Expand Down Expand Up @@ -1540,7 +1507,7 @@ func getPoolConfig(cfg *daemon.Config, ipamType types.IPAMType) (*types.PoolConf
poolConfig.VPC = ins.VPCID
poolConfig.InstanceID = ins.InstanceID

if ipamType == types.IPAMTypeCRD {
if cfg.IPAMType == types.IPAMTypeCRD {
poolConfig.MaxPoolSize = 0
poolConfig.MinPoolSize = 0
poolConfig.MaxENI = 0
Expand Down
Loading

0 comments on commit 6096f32

Please sign in to comment.