Skip to content

Commit

Permalink
PBM-1226: sync agents during applyConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Nov 6, 2024
1 parent fa388cf commit 262bf60
Showing 1 changed file with 203 additions and 49 deletions.
252 changes: 203 additions & 49 deletions cmd/pbm-agent/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"time"

"go.mongodb.org/mongo-driver/mongo"

Expand Down Expand Up @@ -37,93 +38,246 @@ func (a *Agent) handleApplyConfig(
}
}()

oldCfg, err := config.GetConfig(ctx, a.leadConn)
if err != nil {
log.Warn(ctx, "no config set")
oldCfg = &config.Config{}
}
log.Debug(ctx, "init: applying config")

nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
if err != nil {
err = errors.Wrap(err, "get node info")
return
}
if !nodeInfo.IsClusterLeader() {
log.Debug(ctx, "not the leader. skip")
return
}

lck := lock.NewLock(a.leadConn, lock.LockHeader{
Type: ctrl.CmdApplyConfig,
Replset: defs.Replset(),
Node: defs.NodeID(),
OPID: opid.String(),
Epoch: util.Ref(epoch.TS()),
})
// only the leader acquires lock and sets config.
// others report about config suitability for their processes
leader := nodeInfo.IsClusterLeader()

// acquire lock on cluster leader to prevent other command.
// but run the command on all agents to verify config suitability for each agent.
// TODO: add the flow to handleAddConfigProfile
if leader {
lck := lock.NewLock(a.leadConn, lock.LockHeader{
Type: ctrl.CmdApplyConfig,
Replset: defs.Replset(),
Node: defs.NodeID(),
OPID: opid.String(),
Epoch: util.Ref(epoch.TS()),
})

var got bool
got, err = a.acquireLock(ctx, lck)
if err != nil {
err = errors.Wrap(err, "acquiring lock")
return
}
if !got {
err = errors.New("lock not acquired")
return
}
defer func() {
log.Debug(ctx, "releasing lock")
err := lck.Release()
if err != nil {
log.Error(ctx, "unable to release lock %v: %v", lck, err)
}
}()

// set status on leader.
// backup and restore have state machine using their metadata.
// other commands/operations rely on pbm logs for results.
// TODO: PBM-950
log.Info(ctx, string(defs.StatusReady))
// log.Error(ctx, string(defs.StatusError))
}

got, err := a.acquireLock(ctx, lck)
// wait for status from single node - the leader.
// TODO: rewrite with PBM-950
err = waitForState(ctx, a.leadConn, opid, 1, defs.StatusReady)
if err != nil {
err = errors.Wrap(err, "acquiring lock")
err = errors.Wrapf(err, "wait for status %s", defs.StatusReady)
return
}
if !got {
err = errors.New("lock not acquired")
return

oldCfg, err := config.GetConfig(ctx, a.leadConn)
if err != nil {
log.Warn(ctx, "no config set")
oldCfg = &config.Config{}
}
defer func() {
log.Debug(ctx, "releasing lock")
err := lck.Release()
if err != nil {
log.Error(ctx, "unable to release lock %v: %v", lck, err)
}
}()

newCfg := &config.Config{
Storage: *cmd.Storage,
PITR: cmd.PITR,
Backup: cmd.Backup,
Restore: cmd.Restore,
Logging: cmd.Logging,
Epoch: oldCfg.Epoch,
}
err = config.SetConfig(ctx, a.leadConn, newCfg)

numAgents, err := countAgents(ctx, a.leadConn)
if err != nil {
err = errors.Wrap(err, "set config")
err = errors.Wrap(err, "count agents")
return
}
if numAgents < 1 {
err = errors.Errorf("unexpected number of agents: %v", numAgents)
return
}

log.Info(ctx, "config applied")
// TODO: rewrite with PBM-950
// set status on each node.
log.Debug(ctx, string(defs.StatusStarting))
// wait for the status or error from all agents.
err = waitForState(ctx, a.leadConn, opid, numAgents, defs.StatusStarting)
if err != nil {
err = errors.Wrapf(err, "wait for status %s", defs.StatusStarting)
return
}

if !oldCfg.Logging.Equal(cmd.Logging) {
err1 := switchLogger(ctx, a.leadConn, cmd.Logging)
if err1 != nil {
err = errors.Wrap(err1, "failed to switch logger")
if !oldCfg.Storage.Equal(&newCfg.Storage) {
// check storage availability from the agent.
var stg storage.Storage
stg, err = util.StorageFromConfig(ctx, &newCfg.Storage, defs.NodeID())
if err != nil {
err = errors.Wrap(err, "get storage")
return
}

err = storage.HasReadAccess(ctx, stg)
if err != nil {
if !errors.Is(err, storage.ErrUninitialized) {
err = errors.Wrap(err, "check read access")
return
}

if leader {
err = util.Initialize(ctx, stg)
if err != nil {
err = errors.Wrap(err, "init storage")
return
}
}
}
}
}

func switchLogger(ctx context.Context, conn connect.Client, cfg *config.Logging) error {
log.Warn(ctx, "changing local log handler")
var logHandler *log.FileHandler
if !oldCfg.Logging.Equal(newCfg.Logging) {
// check log config on the agent.
logHandler, err = log.NewFileHandler(
newCfg.Logging.Path,
newCfg.Logging.Level.Level(),
newCfg.Logging.JSON)
if err != nil {
err = errors.Wrap(err, "create file handler")
return
}

defer func() {
if err != nil {
// if config is not applied, revert (just close file)
err1 := logHandler.Close()
if err1 != nil {
log.Error(ctx, "failed to close unused log handler")
}
}
}()
}

// TODO: rewrite with PBM-950
log.Debug(ctx, string(defs.StatusRunning))
err = waitForState(ctx, a.leadConn, opid, numAgents, defs.StatusRunning)
if err != nil {
err = errors.Wrapf(err, "wait for status %s", defs.StatusRunning)
return
}

if leader {
// save config by leader
err = config.SetConfig(ctx, a.leadConn, newCfg)
if err != nil {
err = errors.Wrap(err, "set config")
return
}

log.Debug(ctx, string(defs.StatusPartlyDone))
}

logHandler, err := log.NewFileHandler(cfg.Path, cfg.Level.Level(), cfg.JSON)
// almost done. if config saving on leader fails, config is not applied anywhere.
// needs to wait before applying new log handler
err = waitForState(ctx, a.leadConn, opid, 1, defs.StatusPartlyDone)
if err != nil {
return errors.Wrap(err, "create file handler")
err = errors.Wrapf(err, "wait for status %s", defs.StatusPartlyDone)
return
}

prevLogHandler := log.SetLocalHandler(logHandler)
if logHandler != nil {
// there was changes in log config. apply by each agent.
log.Warn(ctx, "changing local log handler")

prevLogHandler := log.SetLocalHandler(logHandler)

log.Info(ctx, "pbm-agent:\n%s", version.Current().All(""))
log.Info(ctx, "node: %s/%s", defs.Replset(), defs.NodeID())
log.Info(ctx, "conn level ReadConcern: %v; WriteConcern: %v",
conn.MongoOptions().ReadConcern.Level,
conn.MongoOptions().WriteConcern.W)
log.Info(ctx, "pbm-agent:\n%s", version.Current().All(""))
log.Info(ctx, "node: %s/%s", defs.Replset(), defs.NodeID())
log.Info(ctx, "conn level ReadConcern: %v; WriteConcern: %v",
a.leadConn.MongoOptions().ReadConcern.Level,
a.leadConn.MongoOptions().WriteConcern.W)

err = prevLogHandler.Close()
// if closing handler fails, config is still suitable
// and new logger are ready for use. print error but proceed.
err := prevLogHandler.Close()
if err != nil {
log.Error(ctx, "close previous log handler: %v", err)
}
}

log.Info(ctx, string(defs.StatusDone))
}

func countAgents(ctx context.Context, conn connect.Client) (int, error) {
agents, err := topo.ListAgentStatuses(ctx, conn)
if err != nil {
log.Error(ctx, "close previous log handler: %v", err)
return 0, err
}
return len(agents), nil
}

return nil
func waitForState(
ctx context.Context,
conn connect.Client,
opid ctrl.OPID,
count int,
state defs.Status,
) error {
// TODO: expose timeout control
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

// TODO: consider performance impact of tail cursors from all agents
outC, errC := log.Follow(ctx, conn, &log.LogRequest{
RecordAttrs: log.RecordAttrs{
OPID: opid.String(),
Level: log.DebugLevel,
},
}, false)

for {
select {
case entry := <-outC:
if entry.Msg == string(state) {
count -= 1 // found
if count == 0 {
return nil // all found
}
} else if entry.Level == log.ErrorLevel {
return errors.New(entry.Msg)
}
case err := <-errC:
return err
case <-ctx.Done():
err := ctx.Err()
if errors.Is(err, context.DeadlineExceeded) {
err = errors.New("no response confirmation from agents")
}
return err
}
}
}

func (a *Agent) handleAddConfigProfile(
Expand Down

0 comments on commit 262bf60

Please sign in to comment.