diff --git a/cmd/pbm-agent/profile.go b/cmd/pbm-agent/profile.go index e69ddb699..4d6122669 100644 --- a/cmd/pbm-agent/profile.go +++ b/cmd/pbm-agent/profile.go @@ -2,6 +2,7 @@ package main import ( "context" + "time" "go.mongodb.org/mongo-driver/mongo" @@ -37,46 +38,69 @@ func (a *Agent) handleApplyConfig( } }() - oldCfg, err := config.GetConfig(ctx, a.leadConn) - if err != nil { - log.Warn(ctx, "no config set") - oldCfg = &config.Config{} - } + log.Debug(ctx, "init: applying config") nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn) if err != nil { err = errors.Wrap(err, "get node info") return } - if !nodeInfo.IsClusterLeader() { - log.Debug(ctx, "not the leader. skip") - return - } - lck := lock.NewLock(a.leadConn, lock.LockHeader{ - Type: ctrl.CmdApplyConfig, - Replset: defs.Replset(), - Node: defs.NodeID(), - OPID: opid.String(), - Epoch: util.Ref(epoch.TS()), - }) + // only the leader acquires lock and sets config. + // others report about config suitability for their processes + leader := nodeInfo.IsClusterLeader() + + // acquire lock on cluster leader to prevent other command. + // but run the command on all agents to verify config suitability for each agent. + // TODO: add the flow to handleAddConfigProfile + if leader { + lck := lock.NewLock(a.leadConn, lock.LockHeader{ + Type: ctrl.CmdApplyConfig, + Replset: defs.Replset(), + Node: defs.NodeID(), + OPID: opid.String(), + Epoch: util.Ref(epoch.TS()), + }) + + var got bool + got, err = a.acquireLock(ctx, lck) + if err != nil { + err = errors.Wrap(err, "acquiring lock") + return + } + if !got { + err = errors.New("lock not acquired") + return + } + defer func() { + log.Debug(ctx, "releasing lock") + err := lck.Release() + if err != nil { + log.Error(ctx, "unable to release lock %v: %v", lck, err) + } + }() + + // set status on leader. + // backup and restore have state machine using their metadata. + // other commands/operations rely on pbm logs for results. + // TODO: PBM-950 + log.Info(ctx, string(defs.StatusReady)) + // log.Error(ctx, string(defs.StatusError)) + } - got, err := a.acquireLock(ctx, lck) + // wait for status from single node - the leader. + // TODO: rewrite with PBM-950 + err = waitForState(ctx, a.leadConn, opid, 1, defs.StatusReady) if err != nil { - err = errors.Wrap(err, "acquiring lock") + err = errors.Wrapf(err, "wait for status %s", defs.StatusReady) return } - if !got { - err = errors.New("lock not acquired") - return + + oldCfg, err := config.GetConfig(ctx, a.leadConn) + if err != nil { + log.Warn(ctx, "no config set") + oldCfg = &config.Config{} } - defer func() { - log.Debug(ctx, "releasing lock") - err := lck.Release() - if err != nil { - log.Error(ctx, "unable to release lock %v: %v", lck, err) - } - }() newCfg := &config.Config{ Storage: *cmd.Storage, @@ -84,46 +108,176 @@ func (a *Agent) handleApplyConfig( Backup: cmd.Backup, Restore: cmd.Restore, Logging: cmd.Logging, + Epoch: oldCfg.Epoch, } - err = config.SetConfig(ctx, a.leadConn, newCfg) + + numAgents, err := countAgents(ctx, a.leadConn) if err != nil { - err = errors.Wrap(err, "set config") + err = errors.Wrap(err, "count agents") + return + } + if numAgents < 1 { + err = errors.Errorf("unexpected number of agents: %v", numAgents) return } - log.Info(ctx, "config applied") + // TODO: rewrite with PBM-950 + // set status on each node. + log.Debug(ctx, string(defs.StatusStarting)) + // wait for the status or error from all agents. + err = waitForState(ctx, a.leadConn, opid, numAgents, defs.StatusStarting) + if err != nil { + err = errors.Wrapf(err, "wait for status %s", defs.StatusStarting) + return + } - if !oldCfg.Logging.Equal(cmd.Logging) { - err1 := switchLogger(ctx, a.leadConn, cmd.Logging) - if err1 != nil { - err = errors.Wrap(err1, "failed to switch logger") + if !oldCfg.Storage.Equal(&newCfg.Storage) { + // check storage availability from the agent. + var stg storage.Storage + stg, err = util.StorageFromConfig(ctx, &newCfg.Storage, defs.NodeID()) + if err != nil { + err = errors.Wrap(err, "get storage") return } + + err = storage.HasReadAccess(ctx, stg) + if err != nil { + if !errors.Is(err, storage.ErrUninitialized) { + err = errors.Wrap(err, "check read access") + return + } + + if leader { + err = util.Initialize(ctx, stg) + if err != nil { + err = errors.Wrap(err, "init storage") + return + } + } + } } -} -func switchLogger(ctx context.Context, conn connect.Client, cfg *config.Logging) error { - log.Warn(ctx, "changing local log handler") + var logHandler *log.FileHandler + if !oldCfg.Logging.Equal(newCfg.Logging) { + // check log config on the agent. + logHandler, err = log.NewFileHandler( + newCfg.Logging.Path, + newCfg.Logging.Level.Level(), + newCfg.Logging.JSON) + if err != nil { + err = errors.Wrap(err, "create file handler") + return + } + + defer func() { + if err != nil { + // if config is not applied, revert (just close file) + err1 := logHandler.Close() + if err1 != nil { + log.Error(ctx, "failed to close unused log handler") + } + } + }() + } + + // TODO: rewrite with PBM-950 + log.Debug(ctx, string(defs.StatusRunning)) + err = waitForState(ctx, a.leadConn, opid, numAgents, defs.StatusRunning) + if err != nil { + err = errors.Wrapf(err, "wait for status %s", defs.StatusRunning) + return + } + + if leader { + // save config by leader + err = config.SetConfig(ctx, a.leadConn, newCfg) + if err != nil { + err = errors.Wrap(err, "set config") + return + } + + log.Debug(ctx, string(defs.StatusPartlyDone)) + } - logHandler, err := log.NewFileHandler(cfg.Path, cfg.Level.Level(), cfg.JSON) + // almost done. if config saving on leader fails, config is not applied anywhere. + // needs to wait before applying new log handler + err = waitForState(ctx, a.leadConn, opid, 1, defs.StatusPartlyDone) if err != nil { - return errors.Wrap(err, "create file handler") + err = errors.Wrapf(err, "wait for status %s", defs.StatusPartlyDone) + return } - prevLogHandler := log.SetLocalHandler(logHandler) + if logHandler != nil { + // there was changes in log config. apply by each agent. + log.Warn(ctx, "changing local log handler") + + prevLogHandler := log.SetLocalHandler(logHandler) - log.Info(ctx, "pbm-agent:\n%s", version.Current().All("")) - log.Info(ctx, "node: %s/%s", defs.Replset(), defs.NodeID()) - log.Info(ctx, "conn level ReadConcern: %v; WriteConcern: %v", - conn.MongoOptions().ReadConcern.Level, - conn.MongoOptions().WriteConcern.W) + log.Info(ctx, "pbm-agent:\n%s", version.Current().All("")) + log.Info(ctx, "node: %s/%s", defs.Replset(), defs.NodeID()) + log.Info(ctx, "conn level ReadConcern: %v; WriteConcern: %v", + a.leadConn.MongoOptions().ReadConcern.Level, + a.leadConn.MongoOptions().WriteConcern.W) - err = prevLogHandler.Close() + // if closing handler fails, config is still suitable + // and new logger are ready for use. print error but proceed. + err := prevLogHandler.Close() + if err != nil { + log.Error(ctx, "close previous log handler: %v", err) + } + } + + log.Info(ctx, string(defs.StatusDone)) +} + +func countAgents(ctx context.Context, conn connect.Client) (int, error) { + agents, err := topo.ListAgentStatuses(ctx, conn) if err != nil { - log.Error(ctx, "close previous log handler: %v", err) + return 0, err } + return len(agents), nil +} - return nil +func waitForState( + ctx context.Context, + conn connect.Client, + opid ctrl.OPID, + count int, + state defs.Status, +) error { + // TODO: expose timeout control + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + // TODO: consider performance impact of tail cursors from all agents + outC, errC := log.Follow(ctx, conn, &log.LogRequest{ + RecordAttrs: log.RecordAttrs{ + OPID: opid.String(), + Level: log.DebugLevel, + }, + }, false) + + for { + select { + case entry := <-outC: + if entry.Msg == string(state) { + count -= 1 // found + if count == 0 { + return nil // all found + } + } else if entry.Level == log.ErrorLevel { + return errors.New(entry.Msg) + } + case err := <-errC: + return err + case <-ctx.Done(): + err := ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + err = errors.New("no response confirmation from agents") + } + return err + } + } } func (a *Agent) handleAddConfigProfile(