Skip to content

Commit 1d5f116

Browse files
committed
Fix race condition: check concurrent limit before GetExposed
Add CanAcceptNewTask() method to datapath.Manager to check if a new task can be accepted before executing expensive GetExposed operation. This prevents multiple tasks from wasting time on GetExposed when the concurrent limit is already reached, which was causing tasks to get ConcurrentLimitExceed errors after spending time on GetExposed. Applied to all controllers: - DataUploadReconciler - DataDownloadReconciler - PodVolumeBackupReconciler - PodVolumeRestoreReconciler Signed-off-by: Andrei Kvapil <[email protected]>
1 parent 554b04e commit 1d5f116

File tree

5 files changed

+46
-0
lines changed

5 files changed

+46
-0
lines changed

pkg/controller/data_download_controller.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
315315
return ctrl.Result{}, nil
316316
}
317317

318+
// Check if we can accept a new task BEFORE doing expensive GetExposed operation
319+
// This prevents race condition where multiple tasks waste time on GetExposed
320+
// when the concurrent limit is already reached
321+
if !r.dataPathMgr.CanAcceptNewTask(false) {
322+
log.Debug("Data path concurrent limit reached, requeue later")
323+
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
324+
}
325+
318326
result, err := r.restoreExposer.GetExposed(ctx, getDataDownloadOwnerObject(dd), r.client, r.nodeName, dd.Spec.OperationTimeout.Duration)
319327
if err != nil {
320328
return r.errorOut(ctx, dd, err, "restore exposer is not ready", log)

pkg/controller/data_upload_controller.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,15 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
319319
log.Info("Cancellable data path is already started")
320320
return ctrl.Result{}, nil
321321
}
322+
323+
// Check if we can accept a new task BEFORE doing expensive GetExposed operation
324+
// This prevents race condition where multiple tasks waste time on GetExposed
325+
// when the concurrent limit is already reached
326+
if !r.dataPathMgr.CanAcceptNewTask(false) {
327+
log.Debug("Data path concurrent limit reached, requeue later")
328+
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
329+
}
330+
322331
waitExposePara := r.setupWaitExposePara(du)
323332
res, err := ep.GetExposed(ctx, getOwnerObject(du), du.Spec.OperationTimeout.Duration, waitExposePara)
324333
if err != nil {

pkg/controller/pod_volume_backup_controller.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,14 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
269269
return ctrl.Result{}, nil
270270
}
271271

272+
// Check if we can accept a new task BEFORE doing expensive GetExposed operation
273+
// This prevents race condition where multiple tasks waste time on GetExposed
274+
// when the concurrent limit is already reached
275+
if !r.dataPathMgr.CanAcceptNewTask(false) {
276+
log.Debug("Data path concurrent limit reached, requeue later")
277+
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
278+
}
279+
272280
res, err := r.exposer.GetExposed(ctx, getPVBOwnerObject(pvb), r.client, r.nodeName, r.resourceTimeout)
273281
if err != nil {
274282
return r.errorOut(ctx, pvb, err, "exposed PVB is not ready", log)

pkg/controller/pod_volume_restore_controller.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,14 @@ func (r *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req
282282
return ctrl.Result{}, nil
283283
}
284284

285+
// Check if we can accept a new task BEFORE doing expensive GetExposed operation
286+
// This prevents race condition where multiple tasks waste time on GetExposed
287+
// when the concurrent limit is already reached
288+
if !r.dataPathMgr.CanAcceptNewTask(false) {
289+
log.Debug("Data path concurrent limit reached, requeue later")
290+
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
291+
}
292+
285293
res, err := r.exposer.GetExposed(ctx, getPVROwnerObject(pvr), r.client, r.nodeName, r.resourceTimeout)
286294
if err != nil {
287295
return r.errorOut(ctx, pvr, err, "exposed PVR is not ready", log)

pkg/datapath/manager.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,16 @@ func (m *Manager) GetAsyncBR(jobName string) AsyncBR {
9595
return nil
9696
}
9797
}
98+
99+
// CanAcceptNewTask checks if a new task can be accepted based on the concurrent limit.
100+
// This is a lightweight check that doesn't create any resources.
101+
func (m *Manager) CanAcceptNewTask(resume bool) bool {
102+
m.trackerLock.Lock()
103+
defer m.trackerLock.Unlock()
104+
105+
if resume {
106+
return true
107+
}
108+
109+
return len(m.tracker) < m.cocurrentNum
110+
}

0 commit comments

Comments
 (0)