Skip to content

Commit

Permalink
PBM-1226: add log config
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Nov 6, 2024
1 parent ecb9f2c commit fa388cf
Show file tree
Hide file tree
Showing 62 changed files with 2,199 additions and 1,661 deletions.
233 changes: 148 additions & 85 deletions cmd/pbm-agent/agent.go

Large diffs are not rendered by default.

92 changes: 44 additions & 48 deletions cmd/pbm-agent/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,18 @@ func (a *Agent) CancelBackup() {

// Backup starts backup
func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID, ep config.Epoch) {
logger := log.FromContext(ctx)

if cmd == nil {
l := logger.NewEvent(string(ctrl.CmdBackup), "", opid.String(), ep.TS())
l.Error("missed command")
log.Error(ctx, "missed command")
return
}

l := logger.NewEvent(string(ctrl.CmdBackup), cmd.Name, opid.String(), ep.TS())
ctx = log.SetLogEventToContext(ctx, l)

nodeInfo, err := topo.GetNodeInfoExt(ctx, a.nodeConn)
if err != nil {
l.Error("get node info: %v", err)
log.Error(ctx, "get node info: %v", err)
return
}
if nodeInfo.ArbiterOnly {
l.Debug("arbiter node. skip")
log.Debug(ctx, "arbiter node. skip")
return
}

Expand All @@ -69,24 +63,24 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
if isClusterLeader {
moveOn, err := a.startBcpLockCheck(ctx)
if err != nil {
l.Error("start backup lock check: %v", err)
log.Error(ctx, "start backup lock check: %v", err)
return
}
if !moveOn {
l.Error("unable to proceed with the backup, active lock is present")
log.Error(ctx, "unable to proceed with the backup, active lock is present")
return
}
}

canRunBackup, err := topo.NodeSuitsExt(ctx, a.nodeConn, nodeInfo, cmd.Type)
if err != nil {
l.Error("node check: %v", err)
log.Error(ctx, "node check: %v", err)
if errors.Is(err, context.Canceled) || !isClusterLeader {
return
}
}
if !canRunBackup {
l.Info("node is not suitable for backup")
log.Info(ctx, "node is not suitable for backup")
if !isClusterLeader {
return
}
Expand All @@ -99,7 +93,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,

cfg, err := config.GetProfiledConfig(ctx, a.leadConn, cmd.Profile)
if err != nil {
l.Error("get profiled config: %v", err)
log.Error(ctx, "get profiled config: %v", err)
return
}

Expand Down Expand Up @@ -131,7 +125,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
if a.brief.Sharded {
bs, err := topo.GetBalancerStatus(ctx, a.leadConn)
if err != nil {
l.Error("get balancer status: %v", err)
log.Error(ctx, "get balancer status: %v", err)
return
}
if bs.IsOn() {
Expand All @@ -140,14 +134,16 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
}
err = bcp.Init(ctx, cmd, opid, balancer)
if err != nil {
l.Error("init meta: %v", err)
log.Error(ctx, "init meta: %v", err)
return
}
l.Debug("init backup meta")
log.Debug(ctx, "init backup meta")

if err = topo.CheckTopoForBackup(ctx, a.leadConn, cmd.Type); err != nil {
ferr := backup.ChangeBackupState(a.leadConn, cmd.Name, defs.StatusError, err.Error())
l.Info("mark backup as %s `%v`: %v", defs.StatusError, err, ferr)
ferr := backup.ChangeBackupState(ctx,
a.leadConn, cmd.Name, defs.StatusError, err.Error())
// TODO: avoid suffix ": <nil>"
log.Info(ctx, "mark backup as %s `%v`: %v", defs.StatusError, err, ferr)
return
}

Expand All @@ -160,7 +156,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
src, err := backup.LastIncrementalBackup(ctx, a.leadConn)
if err != nil {
// try backup anyway
l.Warning("define source backup: %v", err)
log.Warn(ctx, "define source backup: %v", err)
} else {
c = make(map[string]float64)
for _, rs := range src.Replsets {
Expand All @@ -171,7 +167,7 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,

agents, err := topo.ListSteadyAgents(ctx, a.leadConn)
if err != nil {
l.Error("get agents list: %v", err)
log.Error(ctx, "get agents list: %v", err)
return
}

Expand All @@ -181,57 +177,57 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,

shards, err := topo.ClusterMembers(ctx, a.leadConn.MongoClient())
if err != nil {
l.Error("get cluster members: %v", err)
log.Error(ctx, "get cluster members: %v", err)
return
}

for _, sh := range shards {
go func(rs string) {
if err := a.nominateRS(ctx, cmd.Name, rs, nodes.RS(rs)); err != nil {
l.Error("nodes nomination error for %s: %v", rs, err)
log.Error(ctx, "nodes nomination error for %s: %v", rs, err)
}
}(sh.RS)
}
}

nominated, err := a.waitNomination(ctx, cmd.Name)
if err != nil {
l.Error("wait for nomination: %v", err)
log.Error(ctx, "wait for nomination: %v", err)
}
if !nominated {
l.Debug("skip after nomination, probably started by another node")
log.Debug(ctx, "skip after nomination, probably started by another node")
return
}

epoch := ep.TS()
lck := lock.NewLock(a.leadConn, lock.LockHeader{
Type: ctrl.CmdBackup,
Replset: a.brief.SetName,
Node: a.brief.Me,
Replset: defs.Replset(),
Node: defs.NodeID(),
OPID: opid.String(),
Epoch: &epoch,
})

got, err := a.acquireLock(ctx, lck, l)
got, err := a.acquireLock(ctx, lck)
if err != nil {
l.Error("acquiring lock: %v", err)
log.Error(ctx, "acquiring lock: %v", err)
return
}
if !got {
l.Debug("skip: lock not acquired")
log.Debug(ctx, "skip: lock not acquired")
return
}
defer func() {
l.Debug("releasing lock")
log.Debug(ctx, "releasing lock")
err = lck.Release()
if err != nil {
l.Error("unable to release backup lock %v: %v", lck, err)
log.Error(ctx, "unable to release backup lock %v: %v", lck, err)
}
}()

err = backup.SetRSNomineeACK(ctx, a.leadConn, cmd.Name, nodeInfo.SetName, nodeInfo.Me)
if err != nil {
l.Warning("set nominee ack: %v", err)
log.Warn(ctx, "set nominee ack: %v", err)
}

bcpCtx, cancel := context.WithCancel(ctx)
Expand All @@ -240,16 +236,16 @@ func (a *Agent) Backup(ctx context.Context, cmd *ctrl.BackupCmd, opid ctrl.OPID,
a.setBcp(&currentBackup{cancel: cancel})
defer a.setBcp(nil)

l.Info("backup started")
err = bcp.Run(bcpCtx, cmd, opid, l)
log.Info(ctx, "backup started")
err = bcp.Run(bcpCtx, cmd, opid)
if err != nil {
if errors.Is(err, storage.ErrCancelled) || errors.Is(err, context.Canceled) {
l.Info("backup was canceled")
log.Info(ctx, "backup was canceled")
} else {
l.Error("backup: %v", err)
log.Error(ctx, "backup: %v", err)
}
} else {
l.Info("backup finished")
log.Info(ctx, "backup finished")
}
}

Expand All @@ -269,8 +265,7 @@ func (a *Agent) getValidCandidates(agents []topo.AgentStat, backupType defs.Back
const renominationFrame = 5 * time.Second

func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string) error {
l := log.LogEventFromContext(ctx)
l.Debug("nomination list for %s: %v", rs, nodes)
log.Debug(ctx, "nomination list for %s: %v", rs, nodes)

err := backup.SetRSNomination(ctx, a.leadConn, bcp, rs)
if err != nil {
Expand All @@ -283,19 +278,19 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
return errors.Wrap(err, "get nomination meta")
}
if nms != nil && len(nms.Ack) > 0 {
l.Debug("bcp nomination: %s won by %s", rs, nms.Ack)
log.Debug(ctx, "bcp nomination: %s won by %s", rs, nms.Ack)
return nil
}

err = backup.SetRSNominees(ctx, a.leadConn, bcp, rs, n)
if err != nil {
return errors.Wrap(err, "set nominees")
}
l.Debug("nomination %s, set candidates %v", rs, n)
log.Debug(ctx, "nomination %s, set candidates %v", rs, n)

err = backup.BackupHB(ctx, a.leadConn, bcp)
if err != nil {
l.Warning("send heartbeat: %v", err)
log.Warn(ctx, "send heartbeat: %v", err)
}

time.Sleep(renominationFrame)
Expand All @@ -305,18 +300,19 @@ func (a *Agent) nominateRS(ctx context.Context, bcp, rs string, nodes [][]string
}

func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) {
l := log.LogEventFromContext(ctx)

tk := time.NewTicker(time.Millisecond * 500)
defer tk.Stop()

stop := time.NewTimer(defs.WaitActionStart)
defer stop.Stop()

replsetName := defs.Replset()
nodeID := defs.NodeID()

for {
select {
case <-tk.C:
nm, err := backup.GetRSNominees(ctx, a.leadConn, bcp, a.brief.SetName)
nm, err := backup.GetRSNominees(ctx, a.leadConn, bcp, replsetName)
if err != nil {
if errors.Is(err, errors.ErrNotFound) {
continue
Expand All @@ -327,12 +323,12 @@ func (a *Agent) waitNomination(ctx context.Context, bcp string) (bool, error) {
return false, nil
}
for _, n := range nm.Nodes {
if n == a.brief.Me {
if n == nodeID {
return true, nil
}
}
case <-stop.C:
l.Debug("nomination timeout")
log.Debug(ctx, "nomination timeout")
return false, nil
}
}
Expand Down
Loading

0 comments on commit fa388cf

Please sign in to comment.