Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add replicas support for metasrv in bare-metal mode #145

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/bare-metal/cluster-with-local-artifacts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cluster:
mysqlAddr: 0.0.0.0:14200
httpAddr: 0.0.0.0:14300
meta:
replicas: 1
storeAddr: 127.0.0.1:2379
serverAddr: 0.0.0.0:3002
httpAddr: 0.0.0.0:14001
Expand Down
3 changes: 2 additions & 1 deletion examples/bare-metal/cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cluster:
name: mycluster # name of the cluster
artifact:
version: v0.2.0-nightly-20230403
version: latest
frontend:
replicas: 1
datanode:
Expand All @@ -10,6 +10,7 @@ cluster:
mysqlAddr: 0.0.0.0:14200
httpAddr: 0.0.0.0:14300
meta:
replicas: 1
storeAddr: 127.0.0.1:2379
serverAddr: 0.0.0.0:3002
httpAddr: 0.0.0.0:14001
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/gtctl/cluster/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,8 @@ func collectClusterInfoFromBareMetal(data *bmconfig.MetaConfig) (

rows(component.Frontend, data.Cluster.Frontend.Replicas)
rows(component.DataNode, data.Cluster.Datanode.Replicas)
rows(component.MetaSrv, data.Cluster.MetaSrv.Replicas)

// TODO(shawnh2) add metatsrv and etcd replicas support
bulk = append(bulk, []string{component.MetaSrv, pidsMap[component.MetaSrv]})
bulk = append(bulk, []string{component.Etcd, pidsMap[component.Etcd]})

config, err := yaml.Marshal(data.Config)
Expand Down
98 changes: 64 additions & 34 deletions pkg/deployer/baremetal/component/metasrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"net/http"
"path"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -53,28 +54,37 @@ func (m *metaSrv) Name() string {
}

func (m *metaSrv) Start(ctx context.Context, binary string) error {
var (
metaSrvLogDir = path.Join(m.workingDirs.LogsDir, m.Name())
metaSrvPidDir = path.Join(m.workingDirs.PidsDir, m.Name())
metaSrvDirs = []string{metaSrvLogDir, metaSrvPidDir}
)
for _, dir := range metaSrvDirs {
if err := fileutils.CreateDirIfNotExists(dir); err != nil {
// Default bind address for meta srv.
bindAddr := net.JoinHostPort("127.0.0.1", "3002")
if len(m.config.BindAddr) > 0 {
bindAddr = m.config.BindAddr
}

for i := 0; i < m.config.Replicas; i++ {
dirName := fmt.Sprintf("%s.%d", m.Name(), i)

metaSrvLogDir := path.Join(m.workingDirs.LogsDir, dirName)
if err := fileutils.CreateDirIfNotExists(metaSrvLogDir); err != nil {
return err
}
m.logsDirs = append(m.logsDirs, metaSrvLogDir)

metaSrvPidDir := path.Join(m.workingDirs.PidsDir, dirName)
if err := fileutils.CreateDirIfNotExists(metaSrvPidDir); err != nil {
return err
}
m.pidsDirs = append(m.pidsDirs, metaSrvPidDir)

option := &RunOptions{
Binary: binary,
Name: dirName,
logDir: metaSrvLogDir,
pidDir: metaSrvPidDir,
args: m.BuildArgs(ctx, i, bindAddr),
}
if err := runBinary(ctx, option, m.wg, m.logger); err != nil {
return err
}
}
m.logsDirs = append(m.logsDirs, metaSrvLogDir)
m.pidsDirs = append(m.pidsDirs, metaSrvPidDir)

option := &RunOptions{
Binary: binary,
Name: m.Name(),
logDir: metaSrvLogDir,
pidDir: metaSrvPidDir,
args: m.BuildArgs(ctx),
}
if err := runBinary(ctx, option, m.wg, m.logger); err != nil {
return err
}

// Checking component running status with intervals.
Expand All @@ -101,33 +111,47 @@ func (m *metaSrv) BuildArgs(ctx context.Context, params ...interface{}) []string
if logLevel == "" {
logLevel = config.DefaultLogLevel
}

nodeID_, bindAddr_ := params[0], params[1]
nodeID := nodeID_.(int)
bindAddr := bindAddr_.(string)

args := []string{
fmt.Sprintf("--log-level=%s", logLevel),
m.Name(), "start",
"--store-addr", m.config.StoreAddr,
"--server-addr", m.config.ServerAddr,
"--http-addr", m.config.HTTPAddr,
"--http-addr", generateMetaSrvAddr(m.config.HTTPAddr, nodeID),
"--bind-addr", generateMetaSrvAddr(bindAddr, nodeID),
}
return args
}

func (m *metaSrv) IsRunning(ctx context.Context) bool {
_, httpPort, err := net.SplitHostPort(m.config.HTTPAddr)
if err != nil {
m.logger.V(5).Infof("failed to split host port in %s: %s", m.Name(), err)
return false
}
for i := 0; i < m.config.Replicas; i++ {
addr := generateMetaSrvAddr(m.config.HTTPAddr, i)
_, httpPort, err := net.SplitHostPort(addr)
if err != nil {
m.logger.V(5).Infof("failed to split host port in %s: %s", m.Name(), err)
return false
}

rsp, err := http.Get(fmt.Sprintf("http://localhost:%s/health", httpPort))
if err != nil {
m.logger.V(5).Infof("failed to get %s health: %s", m.Name(), err)
return false
}
if err = rsp.Body.Close(); err != nil {
return false
rsp, err := http.Get(fmt.Sprintf("http://localhost:%s/health", httpPort))
if err != nil {
m.logger.V(5).Infof("failed to get %s health: %s", m.Name(), err)
return false
}

if rsp.StatusCode != http.StatusOK {
return false
}

if err = rsp.Body.Close(); err != nil {
return false
}
}

return rsp.StatusCode == http.StatusOK
return true
}

func (m *metaSrv) Delete(ctx context.Context, option DeleteOptions) error {
Expand All @@ -136,3 +160,9 @@ func (m *metaSrv) Delete(ctx context.Context, option DeleteOptions) error {
}
return nil
}

func generateMetaSrvAddr(addr string, nodeID int) string {
host, port, _ := net.SplitHostPort(addr)
portInt, _ := strconv.Atoi(port)
return net.JoinHostPort(host, strconv.Itoa(portInt+nodeID))
}
1 change: 1 addition & 0 deletions pkg/deployer/baremetal/config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func DefaultConfig() *Config {
Replicas: 1,
},
MetaSrv: &MetaSrv{
Replicas: 1,
StoreAddr: "127.0.0.1:2379",
ServerAddr: "0.0.0.0:3002",
HTTPAddr: "0.0.0.0:14001",
Expand Down
2 changes: 2 additions & 0 deletions pkg/deployer/baremetal/config/metasrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package config

type MetaSrv struct {
Replicas int `yaml:"replicas" validate:"gt=0"`

StoreAddr string `yaml:"storeAddr" validate:"hostname_port"`
ServerAddr string `yaml:"serverAddr" validate:"hostname_port"`
BindAddr string `yaml:"bindAddr" validate:"omitempty,hostname_port"`
Expand Down
1 change: 1 addition & 0 deletions pkg/deployer/baremetal/test_data/invalid_artifact.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cluster:
mysqlAddr: 0.0.0.0:14200
httpAddr: 0.0.0.0:14300
meta:
replicas: 1
storeAddr: 127.0.0.1:2379
serverAddr: 0.0.0.0:3002
httpAddr: 0.0.0.0:14001
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cluster:
mysqlAddr: 0.0.0.0:14200
httpAddr: 0.0.0.0:1438000 # invalid port
meta:
replicas: 1
storeAddr: 127.0.0.1:2379
serverAddr: 6870.0.0.0:3243002 # invalid hostname and port
httpAddr: 0.0.0.0:14001
Expand Down
1 change: 1 addition & 0 deletions pkg/deployer/baremetal/test_data/invalid_replicas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cluster:
mysqlAddr: 0.0.0.0:14200
httpAddr: 0.0.0.0:14300
meta:
replicas: 1
storeAddr: 127.0.0.1:2379
serverAddr: 0.0.0.0:3002
httpAddr: 0.0.0.0:14001
Expand Down
1 change: 1 addition & 0 deletions pkg/deployer/baremetal/test_data/valid_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cluster:
mysqlAddr: 0.0.0.0:14200
httpAddr: 0.0.0.0:14300
meta:
replicas: 1
storeAddr: 127.0.0.1:2379
serverAddr: 0.0.0.0:3002
httpAddr: 0.0.0.0:14001
Expand Down
Loading