Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write etcd tombstone file and export errc channel #2

Open
wants to merge 4 commits into
base: release-3.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"reflect"
"runtime"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -75,7 +76,7 @@ func startEtcdOrProxyV2() {
plog.Errorf("When listening on specific address(es), this etcd process must advertise accessible url(s) to each connected client.")
}
}
os.Exit(1)
writeErrorCodeAndExit(cfg.ec.Dir, 1, lg)
}

if lg == nil {
Expand Down Expand Up @@ -127,7 +128,7 @@ func startEtcdOrProxyV2() {

var stopped <-chan struct{}
var errc <-chan error

var lerrc <-chan error
galal-hussein marked this conversation as resolved.
Show resolved Hide resolved
which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
if which != dirEmpty {
if lg != nil {
Expand All @@ -141,7 +142,7 @@ func startEtcdOrProxyV2() {
}
switch which {
case dirMember:
stopped, errc, err = startEtcd(&cfg.ec)
stopped, lerrc, errc, err = startEtcd(&cfg.ec)
case dirProxy:
err = startProxy(cfg)
default:
Expand All @@ -157,7 +158,7 @@ func startEtcdOrProxyV2() {
} else {
shouldProxy := cfg.isProxy()
if !shouldProxy {
stopped, errc, err = startEtcd(&cfg.ec)
stopped, lerrc, errc, err = startEtcd(&cfg.ec)
if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == v2discovery.ErrFullCluster {
if cfg.shouldFallbackToProxy() {
if lg != nil {
Expand Down Expand Up @@ -235,7 +236,7 @@ func startEtcdOrProxyV2() {
plog.Infof("please generate a new discovery token and try to bootstrap again.")
}
}
os.Exit(1)
writeErrorCodeAndExit(cfg.ec.Dir, 1, lg)
}

if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
Expand Down Expand Up @@ -265,7 +266,7 @@ func startEtcdOrProxyV2() {
plog.Infof("if you want to use discovery service, please set --discovery flag.")
}
}
os.Exit(1)
writeErrorCodeAndExit(cfg.ec.Dir, 1, lg)
}
if lg != nil {
lg.Fatal("discovery failed", zap.Error(err))
Expand All @@ -284,6 +285,10 @@ func startEtcdOrProxyV2() {
notifySystemd(lg)

select {
case errc := <- lerrc:
if strings.Contains(errc.Error(), etcdserver.ErrMemberRemoved.Error()) {
writeErrorCodeAndExit(cfg.ec.Dir, 10, lg)
}
case lerr := <-errc:
// fatal out on listener errors
if lg != nil {
Expand All @@ -298,17 +303,18 @@ func startEtcdOrProxyV2() {
}

// startEtcd runs StartEtcd in addition to hooks needed for standalone etcd.
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) {
func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, <-chan error, error) {
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
osutil.RegisterInterruptHandler(e.Close)
select {
case <-e.Server.ReadyNotify(): // wait for e.Server to join the cluster
case <-e.Server.ErrNotify(): // publish aborted errc channel
case <-e.Server.StopNotify(): // publish aborted from 'ErrStopped'
}
return e.Server.StopNotify(), e.Err(), nil
return e.Server.StopNotify(), e.Server.ErrNotify(), e.Err(), nil
}

// startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
Expand Down Expand Up @@ -617,3 +623,18 @@ func checkSupportArch() {
fmt.Printf("etcd on unsupported platform without ETCD_UNSUPPORTED_ARCH=%s set\n", runtime.GOARCH)
os.Exit(1)
}


func writeErrorCodeAndExit(dataDir string, errorCode int, lg *zap.Logger) {
errorCodeFile := filepath.Join(dataDir, "tombstone")
if err := ioutil.WriteFile(errorCodeFile, []byte(strconv.Itoa(errorCode)), 0600); err != nil {
if lg != nil {
lg.Fatal(
"failed to write tombstone file",
zap.String("tombstone-file", errorCodeFile),
)
} else {
plog.Fatalf("failed to write tombstone file %s",errorCodeFile)
}
}
}
1 change: 1 addition & 0 deletions etcdserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
ErrMemberRemoved = errors.New("the member has been permanently removed from the cluster")
)

type DiscoveryError struct {
Expand Down
16 changes: 15 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"os"
"path"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -105,6 +106,7 @@ var (
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "etcdserver")

storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))

)

func init() {
Expand Down Expand Up @@ -1388,7 +1390,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
}
var shouldstop bool
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
go s.stopWithDelay(10*100*time.Millisecond, ErrMemberRemoved)
}
}

Expand Down Expand Up @@ -1551,6 +1553,8 @@ func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
// when the server is stopped.
func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }

func (s *EtcdServer) ErrNotify() <-chan error { return s.errorc }

func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }

func (s *EtcdServer) LeaderStats() []byte {
Expand Down Expand Up @@ -2145,6 +2149,7 @@ func (s *EtcdServer) apply(
s.consistIndex.setConsistentIndex(e.Index)
}
var cc raftpb.ConfChange
var removedSelf bool
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
s.setAppliedIndex(e.Index)
Expand Down Expand Up @@ -2670,6 +2675,15 @@ func (s *EtcdServer) Logger() *zap.Logger {

// IsLearner returns if the local member is raft learner
func (s *EtcdServer) IsLearner() bool {
tombstoneFile := filepath.Join(s.Cfg.DataDir, "tombstone")
if _, err := os.Stat(tombstoneFile); err == nil {
if lg := s.getLogger(); lg != nil {
lg.Warn("this server has been removed from the cluster, to rejoin please restart the server")
} else {
plog.Warning("this server has been removed from the cluster, to rejoin please restart the server")
}
return false
}
return s.cluster.IsLocalMemberLearner()
}

Expand Down