Skip to content

Commit 6574cfe

Browse files
authoredNov 14, 2024··
OAS-10291 Allow to sync setup.json in default mode (no host and port specified) (#437) (#439)
1 parent 6a0f7b8 commit 6574cfe

7 files changed

+85
-44
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- Use --net=host mode for Docker in tests. Extend restart tests.
66
- Fix termination issue in Process tests
77
- Bump Go version (1.22.8) and dependencies for CVE fixes
8+
- Allow to sync setup.json in default mode (no host and port specified)
89

910
## [v0.19.6](https://github.com/arangodb-helper/arangodb/tree/0.19.6) (2024-09-26)
1011
- Fix for member discovery during leader election

‎service/bootstrap_slave.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (s *Service) bootstrapSlave(peerAddress string, runner Runner, config Confi
9898

9999
func RegisterPeer(log zerolog.Logger, masterURL string, req HelloRequest) ClusterConfig {
100100
for {
101-
log.Info().Msgf("Contacting leader %s...", masterURL)
101+
log.Info().Msgf("Registering peer with master at %s", masterURL)
102102

103103
encoded, err := json.Marshal(req)
104104
if err != nil {
@@ -157,6 +157,8 @@ func RegisterPeer(log zerolog.Logger, masterURL string, req HelloRequest) Cluste
157157
return result
158158
}
159159

160+
log.Info().Msgf("Successfully registered peer (ID: %s) with master at %s", req.SlaveID, masterURL)
161+
160162
return result
161163
}
162164
}
@@ -173,3 +175,19 @@ func BuildHelloRequest(id string, slavePort int, isSecure bool, config Config, b
173175
Coordinator: copyBoolRef(bsCfg.StartCoordinator),
174176
}
175177
}
178+
179+
func BuildHelloRequestFromPeer(p Peer) HelloRequest {
180+
return HelloRequest{
181+
DataDir: p.DataDir,
182+
SlaveID: p.ID,
183+
184+
// we can not change the address and port of the peer
185+
SlaveAddress: p.Address,
186+
SlavePort: p.Port,
187+
188+
IsSecure: p.IsSecure,
189+
Agent: &p.HasAgentFlag,
190+
DBServer: copyBoolRef(p.HasDBServerFlag),
191+
Coordinator: copyBoolRef(p.HasCoordinatorFlag),
192+
}
193+
}

‎service/cluster_config.go

+9
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ func (p ClusterConfig) AllAgents() []Peer {
8181
return result
8282
}
8383

84+
func (p *ClusterConfig) IsPortOffsetInUse() bool {
85+
for _, x := range p.AllPeers {
86+
if x.PortOffset != 0 {
87+
return true
88+
}
89+
}
90+
return false
91+
}
92+
8493
// Initialize a new cluster configuration
8594
func (p *ClusterConfig) Initialize(initialPeer Peer, agencySize int, storageEngine string, persistentOptions options.PersistentOptions) {
8695
p.AllPeers = []Peer{initialPeer}

‎service/runtime_cluster_manager.go

+27-24
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"encoding/json"
2626
"fmt"
2727
"io"
28+
"reflect"
2829
"sync"
2930
"time"
3031

@@ -79,7 +80,7 @@ func (s *runtimeClusterManager) createAgencyAPI() (agency.Agency, error) {
7980
}
8081

8182
// updateClusterConfiguration asks the master at given URL for the latest cluster configuration.
82-
func (s *runtimeClusterManager) updateClusterConfiguration(ctx context.Context, masterURL string, req HelloRequest) error {
83+
func (s *runtimeClusterManager) updateClusterConfiguration(ctx context.Context, masterURL string, myPeer string) error {
8384
helloURL, err := getURLWithPath(masterURL, "/hello?update=1")
8485
if err != nil {
8586
return maskAny(err)
@@ -103,26 +104,25 @@ func (s *runtimeClusterManager) updateClusterConfiguration(ctx context.Context,
103104
if err := json.Unmarshal(body, &clusterConfig); err != nil {
104105
return maskAny(err)
105106
}
106-
// We've received a cluster config
107-
err = s.runtimeContext.UpdateClusterConfig(clusterConfig)
108-
if err != nil {
109-
s.log.Error().Err(err).Msg("Failed to update cluster configuration, Trying to register peer again")
110107

111-
_, hostPort, err := s.runtimeContext.GetHTTPServerPort()
112-
if err != nil {
113-
s.log.Fatal().Err(err).Msg("Failed to get HTTP server port during runtime cluster manager start")
114-
}
115-
req.SlavePort = hostPort
108+
latestPeerVersion, _ := s.myPeers.PeerByID(myPeer)
116109

117-
cfg := RegisterPeer(s.log, masterURL, req)
118-
s.myPeers = cfg
110+
myPeerFromMaster, exist := clusterConfig.PeerByID(myPeer)
111+
if !exist {
112+
s.log.Warn().Msgf("Leader responded with cluster config that does not contain this peer, re-registering. Local peer: %v", latestPeerVersion)
113+
clusterConfig = RegisterPeer(s.log, masterURL, BuildHelloRequestFromPeer(latestPeerVersion))
114+
}
119115

120-
// retry to update cluster configuration
121-
err = s.runtimeContext.UpdateClusterConfig(cfg)
122-
if err != nil {
123-
s.log.Error().Err(err).Msg("Failed to update cluster configuration after registering peer")
124-
return maskAny(err)
125-
}
116+
if !reflect.DeepEqual(latestPeerVersion, myPeerFromMaster) {
117+
s.log.Warn().Msgf("Leader responded with cluster config that does contain different peer, re-registering. Peer from master: %v, Local peer: %v", myPeerFromMaster, latestPeerVersion)
118+
clusterConfig = RegisterPeer(s.log, masterURL, BuildHelloRequestFromPeer(latestPeerVersion))
119+
}
120+
121+
// We've received a cluster config - let's store it
122+
err = s.runtimeContext.UpdateClusterConfig(clusterConfig)
123+
if err != nil {
124+
s.log.Warn().Err(err).Msg("Failed to update cluster configuration")
125+
return err
126126
}
127127

128128
return nil
@@ -182,16 +182,19 @@ func (s *runtimeClusterManager) runLeaderElection(ctx context.Context, myURL str
182182
var masterURL string
183183
var isMaster bool
184184

185-
s.log.Debug().
186-
Str("myURL", myURL).
187-
Str("masterURL", masterURL).
188-
Msg("Updating leadership")
189185
masterURL, isMaster, delay, err = le.Update(ctx, agencyClient, myURL)
190186
if err != nil {
191187
delay = 5 * time.Second
192188
s.log.Error().Err(err).Msgf("Update leader election failed. Retrying in %s", delay)
193189
continue
194190
}
191+
192+
s.log.Debug().
193+
Str("myURL", myURL).
194+
Str("masterURL", masterURL).
195+
Str("oldMasterURL", oldMasterURL).
196+
Msg("Updating leadership")
197+
195198
if isMaster && masterURL != myURL {
196199
s.log.Error().Msgf("Unexpected error: this peer is a master but URL differs. Should be %s got %s", myURL, masterURL)
197200
}
@@ -225,7 +228,7 @@ func (s *runtimeClusterManager) updateMasterURL(masterURL string, isMaster bool)
225228

226229
// Run keeps the cluster configuration up to date, either as master or as slave
227230
// during a running state.
228-
func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, runtimeContext runtimeClusterManagerContext, req HelloRequest) {
231+
func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, runtimeContext runtimeClusterManagerContext) {
229232
s.log = log
230233
s.runtimeContext = runtimeContext
231234
s.interruptChan = make(chan struct{}, 32)
@@ -255,7 +258,7 @@ func (s *runtimeClusterManager) Run(ctx context.Context, log zerolog.Logger, run
255258
if masterURL != "" && masterURL != ownURL {
256259
log.Debug().Msgf("Updating cluster configuration master URL: %s", masterURL)
257260
// We are a follower, try to update cluster configuration from leader
258-
if err := s.updateClusterConfiguration(ctx, masterURL, req); err != nil {
261+
if err := s.updateClusterConfiguration(ctx, masterURL, myPeer.ID); err != nil {
259262
delay = time.Second * 5
260263
log.Warn().Err(err).Msgf("Failed to load cluster configuration from %s", masterURL)
261264
} else {

‎service/service.go

+26-16
Original file line numberDiff line numberDiff line change
@@ -805,21 +805,26 @@ func (s *Service) HandleHello(ownAddress, remoteAddress string, req *HelloReques
805805
break
806806
}
807807
}
808-
// Slave address may not change in this case
809-
if addrFoundInOtherPeer && p.Address != slaveAddr {
810-
s.log.Warn().Msgf("Cannot change slave address while using an existing ID. Remote address: %s", remoteAddress)
811-
return ClusterConfig{}, maskAny(client.NewBadRequestError("Cannot change slave address while using an existing ID."))
808+
if addrFoundInOtherPeer && isLocalAddress(slaveAddr) && isLocalAddress(p.Address) && s.runtimeClusterManager.myPeers.IsPortOffsetInUse() {
809+
// This is a default configuration, where host and port are not set. Keep offset.
810+
s.log.Warn().Msgf("Updating slave with local address (%s). Offset (%d) will be kept. Peer id: %s",
811+
p.Address, p.PortOffset, p.ID)
812+
} else if addrFoundInOtherPeer && p.Address != slaveAddr {
813+
msg := fmt.Sprintf("Cannot change slave address (%s) to an address that is already in use by another peer (id: %s)", slaveAddr, p.ID)
814+
s.log.Warn().Msgf(msg)
815+
return ClusterConfig{}, maskAny(client.NewBadRequestError(msg))
816+
} else {
817+
// We accept the new address (it might be the old one):
818+
peer.Address = slaveAddr
819+
// However, since we also accept the port, we must set the
820+
// port offset of that replaced peer to 0 such that the AllPeers
821+
// information actually contains the right port.
822+
peer.PortOffset = 0
812823
}
813-
// We accept the new address (it might be the old one):
814-
peer.Address = slaveAddr
815-
// However, since we also accept the port, we must set the
816-
// port ofset of that replaced peer to 0 such that the AllPeers
817-
// information actually contains the right port.
818-
peer.PortOffset = 0
824+
819825
}
820826
peer.Port = req.SlavePort
821827
peer.DataDir = req.DataDir
822-
823828
peer.HasAgentFlag = boolFromRef(req.Agent, peer.HasAgentFlag)
824829
peer.HasCoordinatorFlag = utils.NotNilDefault(req.Coordinator, peer.HasCoordinatorFlag)
825830
peer.HasDBServerFlag = utils.NotNilDefault(req.DBServer, peer.HasDBServerFlag)
@@ -867,6 +872,10 @@ func (s *Service) HandleHello(ownAddress, remoteAddress string, req *HelloReques
867872
return s.runtimeClusterManager.myPeers, nil
868873
}
869874

875+
func isLocalAddress(addr string) bool {
876+
return addr == "127.0.0.1" || addr == "localhost"
877+
}
878+
870879
// ChangeState alters the current state of the service
871880
func (s *Service) ChangeState(newState State) {
872881
s.mutex.Lock()
@@ -989,8 +998,8 @@ func (s *Service) UpdateClusterConfig(newConfig ClusterConfig) error {
989998
// Only update when changed
990999
if !reflect.DeepEqual(s.runtimeClusterManager.myPeers, newConfig) {
9911000
s.runtimeClusterManager.myPeers = newConfig
992-
s.saveSetup()
993-
s.log.Debug().Msg("Updated cluster config")
1001+
s.log.Debug().Msgf("Updating cluster config - %v", newConfig)
1002+
return s.saveSetup()
9941003
} else {
9951004
s.log.Debug().Msg("Updating cluster config is not needed")
9961005
}
@@ -1030,7 +1039,7 @@ func (s *Service) GetHTTPServerPort() (containerPort, hostPort int, err error) {
10301039
if myPeer, ok := s.runtimeClusterManager.myPeers.PeerByID(s.id); ok {
10311040
containerPort += myPeer.PortOffset
10321041
} else {
1033-
return 0, 0, maskAny(fmt.Errorf("No peer information found for ID '%s'", s.id))
1042+
return 0, 0, maskAny(fmt.Errorf("no peer information found for ID '%s'", s.id))
10341043
}
10351044
}
10361045
if s.isNetHost {
@@ -1050,6 +1059,7 @@ func (s *Service) createHTTPServer(config Config) (srv *httpServer, containerPor
10501059
hostAddr = net.JoinHostPort(config.OwnAddress, strconv.Itoa(hostPort))
10511060

10521061
// Create HTTP server
1062+
s.log.Debug().Msgf("Creating HTTP server on %s", hostAddr)
10531063
return newHTTPServer(s.log, s, &s.runtimeServerManager, config, s.id), containerPort, hostAddr, containerAddr, nil
10541064
}
10551065

@@ -1095,7 +1105,7 @@ func (s *Service) startRunning(runner Runner, config Config, bsCfg BootstrapConf
10951105
go func() {
10961106
defer wg.Done()
10971107

1098-
s.runtimeClusterManager.Run(s.stopPeer.ctx, s.log, s, BuildHelloRequest(s.id, 0, s.IsSecure(), config, bsCfg))
1108+
s.runtimeClusterManager.Run(s.stopPeer.ctx, s.log, s)
10991109
}()
11001110

11011111
// Start the upgrade manager
@@ -1228,7 +1238,7 @@ func (s *Service) adjustClusterConfigForRelaunch(bsCfg BootstrapConfig) {
12281238

12291239
s.runtimeClusterManager.myPeers.ForEachPeer(func(p Peer) Peer {
12301240
if bsCfg.ID == p.ID {
1231-
s.log.Debug().Msgf("Adjusting current memeber cluster config after restart (port: %d)", p.Port)
1241+
s.log.Debug().Msgf("Adjusting current memeber cluster config (locally) after restart (port: %d)", p.Port)
12321242
p.peerServers = preparePeerServers(s.mode, bsCfg, &p)
12331243
}
12341244
return p

‎test/process_cluster_local_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestProcessClusterLocal(t *testing.T) {
4040
child := Spawn(t, "${STARTER} --starter.local "+createEnvironmentStarterOptions())
4141
defer child.Close()
4242

43-
if ok := WaitUntilStarterReady(t, whatCluster, 3, child); ok {
43+
if ok := WaitUntilStarterReady(t, whatCluster, 1, child); ok {
4444
t.Logf("Cluster start took %s", time.Since(start))
4545
testCluster(t, insecureStarterEndpoint(0*portIncrement), false)
4646
testCluster(t, insecureStarterEndpoint(1*portIncrement), false)
@@ -65,7 +65,7 @@ func TestProcessClusterLocalShutdownViaAPI(t *testing.T) {
6565
child := Spawn(t, "${STARTER} --starter.local "+createEnvironmentStarterOptions())
6666
defer child.Close()
6767

68-
if ok := WaitUntilStarterReady(t, whatCluster, 3, child); ok {
68+
if ok := WaitUntilStarterReady(t, whatCluster, 1, child); ok {
6969
t.Logf("Cluster start took %s", time.Since(start))
7070
testCluster(t, insecureStarterEndpoint(0*portIncrement), false)
7171
testCluster(t, insecureStarterEndpoint(1*portIncrement), false)
@@ -88,7 +88,7 @@ func TestOldProcessClusterLocal(t *testing.T) {
8888
child := Spawn(t, "${STARTER} --local "+createEnvironmentStarterOptions())
8989
defer child.Close()
9090

91-
if ok := WaitUntilStarterReady(t, whatCluster, 3, child); ok {
91+
if ok := WaitUntilStarterReady(t, whatCluster, 1, child); ok {
9292
t.Logf("Cluster start took %s", time.Since(start))
9393
testCluster(t, insecureStarterEndpoint(0*portIncrement), false)
9494
testCluster(t, insecureStarterEndpoint(1*portIncrement), false)
File renamed without changes.

0 commit comments

Comments
 (0)
Please sign in to comment.