Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions pkg/controller/perconaservermongodbrestore/physical.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,46 @@ var anotherOpBackoff = wait.Backoff{
Cap: 15 * time.Minute,
}

const (
annPrepareStarted = "operator.percona.com/physical-prepare-started"
)

// markPrepareStarted annotates the restore CR when prepare begins.
func (r *ReconcilePerconaServerMongoDBRestore) markPrepareStarted(ctx context.Context, cr *psmdbv1.PerconaServerMongoDBRestore) error {
current := &psmdbv1.PerconaServerMongoDBRestore{}
if err := r.client.Get(ctx, types.NamespacedName{Name: cr.Name, Namespace: cr.Namespace}, current); err != nil {
return errors.Wrap(err, "get restore to mark prepare started")
}
anns := current.GetAnnotations()
if anns == nil {
anns = map[string]string{}
}
if _, ok := anns[annPrepareStarted]; ok {
return nil
}
anns[annPrepareStarted] = time.Now().UTC().Format(time.RFC3339)
current.SetAnnotations(anns)
return r.client.Update(ctx, current)
}

// clearPrepareStarted removes the started marker when prepare is done.
func (r *ReconcilePerconaServerMongoDBRestore) clearPrepareStarted(ctx context.Context, cr *psmdbv1.PerconaServerMongoDBRestore) error {
current := &psmdbv1.PerconaServerMongoDBRestore{}
if err := r.client.Get(ctx, types.NamespacedName{Name: cr.Name, Namespace: cr.Namespace}, current); err != nil {
return errors.Wrap(err, "get restore to clear prepare started")
}
anns := current.GetAnnotations()
if anns == nil {
return nil
}
if _, ok := anns[annPrepareStarted]; !ok {
return nil
}
delete(anns, annPrepareStarted)
current.SetAnnotations(anns)
return r.client.Update(ctx, current)
}

// reconcilePhysicalRestore performs a physical restore of a Percona Server for MongoDB from a backup.
func (r *ReconcilePerconaServerMongoDBRestore) reconcilePhysicalRestore(
ctx context.Context,
Expand Down Expand Up @@ -107,18 +147,59 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcilePhysicalRestore(
}

if cr.Status.State == psmdbv1.RestoreStateWaiting && sfsReady && cr.Spec.PITR != nil {
// If a prepare already started, skip readiness/prepare this cycle unless expired
anns := cr.GetAnnotations()
if anns != nil {
if ts := anns[annPrepareStarted]; ts != "" {
if startedAt, err := time.Parse(time.RFC3339, ts); err == nil {
if time.Since(startedAt) > 10*time.Minute {
// auto-expire the marker to allow retry
if err := r.clearPrepareStarted(ctx, cr); err != nil {
log.V(1).Error(err, "failed to clear expired prepare-started marker")
}
} else {
log.Info("Physical prepare is in progress; skipping readiness/prepare this reconcile")
return status, nil
}
} else {
// malformed timestamp: clear defensively
if err := r.clearPrepareStarted(ctx, cr); err != nil {
log.V(1).Error(err, "failed to clear malformed prepare-started marker")
}
}
}
}

rsReady, err := r.checkIfReplsetsAreReadyForPhysicalRestore(ctx, cluster)
if err != nil {
return status, errors.Wrap(err, "check if replsets are ready for physical restore")
}

if !rsReady {
// Mark prepare start deterministically once
if err := r.markPrepareStarted(ctx, cr); err != nil {
log.V(1).Error(err, "failed to mark prepare started")
}
if err := r.prepareReplsetsForPhysicalRestore(ctx, cluster); err != nil {
return status, errors.Wrap(err, "prepare replsets for physical restore")
}

log.Info("Waiting for replsets to be ready before restore", "ready", rsReady)
return status, nil
// Re-check readiness after preparation to avoid infinite loop
rsReady, err = r.checkIfReplsetsAreReadyForPhysicalRestore(ctx, cluster)
if err != nil {
return status, errors.Wrap(err, "re-check if replsets are ready for physical restore")
}

if !rsReady {
log.Info("Waiting for replsets to be ready before restore", "ready", rsReady)
return status, nil
}

// Prepare finished deterministically when readiness turns true: clear started marker
if err := r.clearPrepareStarted(ctx, cr); err != nil {
log.V(1).Error(err, "failed to clear prepare started")
}
// If rsReady is now true, continue to the next phase
}
}

Expand Down