diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 3f776778..f17ad058 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -348,13 +348,14 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * // TODO: We can use informer to filter AWs that do not meet the minScheduling spec. // we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer -func (qjm *XController) PreemptQueueJobs() { +func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { ctx := context.Background() + aw := qjm.GetQueueJobEligibleForPreemption(inspectAw) + if aw != nil { - qjobs := qjm.GetQueueJobsEligibleForPreemption() - for _, aw := range qjobs { + //for _, aw := range qjobs { if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed { - continue + return } var updateNewJob *arbv1.AppWrapper @@ -362,7 +363,7 @@ func (qjm *XController) PreemptQueueJobs() { newjob, err := qjm.getAppWrapper(aw.Namespace, aw.Name, "[PreemptQueueJobs] get fresh app wrapper") if err != nil { klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err) - continue + return } //we need to update AW before analyzing it as a candidate for preemption updateErr := qjm.UpdateQueueJobStatus(newjob) @@ -394,13 +395,11 @@ func (qjm *XController) PreemptQueueJobs() { err := qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- DispatchDeadlineExceeded") if err != nil { klog.Warningf("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed", newjob.Namespace, newjob.Name) - continue + return } // cannot use cleanup AW, since it puts AW back in running state qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob) - // Move to next AW - continue } } @@ -462,7 +461,7 @@ func (qjm *XController) PreemptQueueJobs() { err = qjm.updateStatusInEtcdWithRetry(ctx, updateNewJob, "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning") if err != nil { klog.Warningf("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v", newjob.Namespace, newjob.Name, err) - continue + return } if cleanAppWrapper { @@ -506,98 +505,83 @@ func (qjm *XController) preemptAWJobs(ctx context.Context, preemptAWs []*arbv1.A } } -func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper { - qjobs := make([]*arbv1.AppWrapper, 0) - - queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything()) - if err != nil { - klog.Errorf("List of queueJobs %+v", qjobs) - return qjobs - } +func (qjm *XController) GetQueueJobEligibleForPreemption(value *arbv1.AppWrapper) *arbv1.AppWrapper { if !qjm.isDispatcher { // Agent Mode - for _, value := range queueJobs { - - // Skip if AW Pending or just entering the system and does not have a state yet. - if (value.Status.State == arbv1.AppWrapperStateEnqueued) || (value.Status.State == "") { - continue - } - if value.Status.State == arbv1.AppWrapperStateActive && value.Spec.SchedSpec.DispatchDuration.Limit > 0 { - awDispatchDurationLimit := value.Spec.SchedSpec.DispatchDuration.Limit - dispatchDuration := value.Status.ControllerFirstDispatchTimestamp.Add(time.Duration(awDispatchDurationLimit) * time.Second) - currentTime := time.Now() - dispatchTimeExceeded := !currentTime.Before(dispatchDuration) + if value.Status.State == arbv1.AppWrapperStateActive && value.Spec.SchedSpec.DispatchDuration.Limit > 0 { + awDispatchDurationLimit := value.Spec.SchedSpec.DispatchDuration.Limit + dispatchDuration := value.Status.ControllerFirstDispatchTimestamp.Add(time.Duration(awDispatchDurationLimit) * time.Second) + currentTime := time.Now() + dispatchTimeExceeded := !currentTime.Before(dispatchDuration) - if dispatchTimeExceeded { - klog.V(8).Infof("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v", currentTime, dispatchDuration) - value.Spec.SchedSpec.DispatchDuration.Overrun = true - qjobs = append(qjobs, value) - // Got AW which exceeded dispatch runtime limit, move to next AW - continue - } + if dispatchTimeExceeded { + klog.V(8).Infof("Appwrapper Dispatch limit exceeded, currentTime %v, dispatchTimeInSeconds %v", currentTime, dispatchDuration) + value.Spec.SchedSpec.DispatchDuration.Overrun = true + // Got AW which exceeded dispatch runtime limit, move to next AW + return value } - replicas := value.Spec.SchedSpec.MinAvailable + } + replicas := value.Spec.SchedSpec.MinAvailable - if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas { + if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas { - // Find the dispatched condition if there is any - numConditions := len(value.Status.Conditions) - var dispatchedCondition arbv1.AppWrapperCondition - dispatchedConditionExists := false + // Find the dispatched condition if there is any + numConditions := len(value.Status.Conditions) + var dispatchedCondition arbv1.AppWrapperCondition + dispatchedConditionExists := false - for i := numConditions - 1; i > 0; i-- { - dispatchedCondition = value.Status.Conditions[i] - if dispatchedCondition.Type != arbv1.AppWrapperCondDispatched { - continue - } - dispatchedConditionExists = true - break + for i := numConditions - 1; i > 0; i-- { + dispatchedCondition = value.Status.Conditions[i] + if dispatchedCondition.Type != arbv1.AppWrapperCondDispatched { + continue } + dispatchedConditionExists = true + break + } - // Check for the minimum age and then skip preempt if current time is not beyond minimum age - // The minimum age is controlled by the requeuing.TimeInSeconds stanza - // For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later - lastCondition := value.Status.Conditions[numConditions-1] - var condition arbv1.AppWrapperCondition + // Check for the minimum age and then skip preempt if current time is not beyond minimum age + // The minimum age is controlled by the requeuing.TimeInSeconds stanza + // For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later + lastCondition := value.Status.Conditions[numConditions-1] + var condition arbv1.AppWrapperCondition - if dispatchedConditionExists && dispatchedCondition.LastTransitionMicroTime.After(lastCondition.LastTransitionMicroTime.Time) { - condition = dispatchedCondition - } else { - condition = lastCondition - } - var requeuingTimeInSeconds int - if value.Status.RequeueingTimeInSeconds > 0 { - requeuingTimeInSeconds = value.Status.RequeueingTimeInSeconds - } else if value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 { - requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.TimeInSeconds - } else { - requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds - } + if dispatchedConditionExists && dispatchedCondition.LastTransitionMicroTime.After(lastCondition.LastTransitionMicroTime.Time) { + condition = dispatchedCondition + } else { + condition = lastCondition + } + var requeuingTimeInSeconds int + if value.Status.RequeueingTimeInSeconds > 0 { + requeuingTimeInSeconds = value.Status.RequeueingTimeInSeconds + } else if value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 { + requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.TimeInSeconds + } else { + requeuingTimeInSeconds = value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds + } - minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeInSeconds) * time.Second) - currentTime := time.Now() + minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeInSeconds) * time.Second) + currentTime := time.Now() - if currentTime.Before(minAge) { - continue - } + if currentTime.Before(minAge) { + return nil + } - if replicas > 0 { - klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d - minAvailable: %d , Succeeded: %d !!!", value.Namespace, value.Name, value.Status.Running, replicas, value.Status.Succeeded) - qjobs = append(qjobs, value) - } - } else { - // Preempt when schedulingSpec stanza is not set but pods fails scheduling. - // ignore co-scheduler pods - if len(value.Status.PendingPodConditions) > 0 { - klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d , Succeeded: %d due to failed scheduling !!!", value.Namespace, value.Status.Running, value.Status.Succeeded) - qjobs = append(qjobs, value) - } + if replicas > 0 { + klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d - minAvailable: %d , Succeeded: %d !!!", value.Namespace, value.Name, value.Status.Running, replicas, value.Status.Succeeded) + return value + } + } else { + // Preempt when schedulingSpec stanza is not set but pods fails scheduling. + // ignore co-scheduler pods + if len(value.Status.PendingPodConditions) > 0 { + klog.V(3).Infof("AppWrapper '%s/%s' is eligible for preemption Running: %d , Succeeded: %d due to failed scheduling !!!", value.Namespace, value.Status.Running, value.Status.Succeeded) + return value } } } - return qjobs + return nil } func (qjm *XController) GetAggregatedResourcesPerGenericItem(cqj *arbv1.AppWrapper) []*clusterstateapi.Resource { @@ -1500,20 +1484,8 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason func (cc *XController) Run(stopCh <-chan struct{}) { go cc.appwrapperInformer.Informer().Run(stopCh) - // go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh) - cache.WaitForCacheSync(stopCh, cc.appWrapperSynced) - // cache is turned off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588 - // update snapshot of ClientStateCache every second - // cc.cache.Run(stopCh) - - // start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration - go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh) - - // This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion - go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh) - if cc.isDispatcher { go wait.Until(cc.UpdateAgent, 2*time.Second, stopCh) // In the Agent? for _, jobClusterAgent := range cc.agentMap { @@ -1536,90 +1508,79 @@ func (qjm *XController) UpdateAgent() { // Move AW from Running to Completed or RunningHoldCompletion // Do not use event queues! Running AWs move to Completed, from which it will never transition to any other state. // State transition: Running->RunningHoldCompletion->Completed -func (qjm *XController) UpdateQueueJobs() { - queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything()) - if err != nil { - klog.Errorf("[UpdateQueueJobs] Failed to get a list of active appwrappers, err=%+v", err) - return - } - containsCompletionStatus := false - for _, newjob := range queueJobs { - for _, item := range newjob.Spec.AggrResources.GenericItems { - if len(item.CompletionStatus) > 0 { - containsCompletionStatus = true - } +func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { + + if newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion { + err := qjm.UpdateQueueJobStatus(newjob) + if err != nil { + klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err) + //TODO: should we really return? + return } - if (newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion) && containsCompletionStatus { - err := qjm.UpdateQueueJobStatus(newjob) - if err != nil { - klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err) - continue - } - klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status) - // set appwrapper status to Complete or RunningHoldCompletion - derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob) + klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status) + // set appwrapper status to Complete or RunningHoldCompletion + derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob) - klog.Infof("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, newjob.Namespace, newjob.Name, newjob.ResourceVersion, - newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) + klog.Infof("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, newjob.Namespace, newjob.Name, newjob.ResourceVersion, + newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) - // Set Appwrapper state to complete if all items in Appwrapper - // are completed - if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion { - newjob.Status.State = derivedAwStatus - var updateQj *arbv1.AppWrapper - index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted") - if index < 0 { - newjob.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion - cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "") - newjob.Status.Conditions = append(newjob.Status.Conditions, cond) - newjob.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion - updateQj = newjob.DeepCopy() - } else { - cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "") - newjob.Status.Conditions[index] = *cond.DeepCopy() - updateQj = newjob.DeepCopy() - } - err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion") - if err != nil { - // TODO: implement retry - klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) - } + // Set Appwrapper state to complete if all items in Appwrapper + // are completed + if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion { + newjob.Status.State = derivedAwStatus + var updateQj *arbv1.AppWrapper + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted") + if index < 0 { + newjob.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "") + newjob.Status.Conditions = append(newjob.Status.Conditions, cond) + newjob.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion + updateQj = newjob.DeepCopy() + } else { + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "") + newjob.Status.Conditions[index] = *cond.DeepCopy() + updateQj = newjob.DeepCopy() } - // Set appwrapper status to complete - if derivedAwStatus == arbv1.AppWrapperStateCompleted { - newjob.Status.State = derivedAwStatus - newjob.Status.CanRun = false - var updateQj *arbv1.AppWrapper - index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondCompleted, "PodsCompleted") - if index < 0 { - newjob.Status.QueueJobState = arbv1.AppWrapperCondCompleted - cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "") - newjob.Status.Conditions = append(newjob.Status.Conditions, cond) - newjob.Status.FilterIgnore = true // Update AppWrapperCondCompleted - updateQj = newjob.DeepCopy() - } else { - cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "") - newjob.Status.Conditions[index] = *cond.DeepCopy() - updateQj = newjob.DeepCopy() - } - err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setCompleted") - if err != nil { - if qjm.quotaManager != nil { - qjm.quotaManager.Release(updateQj) - } - // TODO: Implement retry - klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) - } + err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion") + if err != nil { + // TODO: implement retry + klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + } + } + // Set appwrapper status to complete + if derivedAwStatus == arbv1.AppWrapperStateCompleted { + newjob.Status.State = derivedAwStatus + newjob.Status.CanRun = false + var updateQj *arbv1.AppWrapper + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondCompleted, "PodsCompleted") + if index < 0 { + newjob.Status.QueueJobState = arbv1.AppWrapperCondCompleted + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "") + newjob.Status.Conditions = append(newjob.Status.Conditions, cond) + newjob.Status.FilterIgnore = true // Update AppWrapperCondCompleted + updateQj = newjob.DeepCopy() + } else { + cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "") + newjob.Status.Conditions[index] = *cond.DeepCopy() + updateQj = newjob.DeepCopy() + } + err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setCompleted") + if err != nil { if qjm.quotaManager != nil { qjm.quotaManager.Release(updateQj) } - // Delete AW from both queue's - qjm.eventQueue.Delete(updateQj) - qjm.qjqueue.Delete(updateQj) + // TODO: Implement retry + klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + } + if qjm.quotaManager != nil { + qjm.quotaManager.Release(updateQj) } - klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, - newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) + // Delete AW from both queue's + qjm.eventQueue.Delete(updateQj) + qjm.qjqueue.Delete(updateQj) } + klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion, + newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed) } } @@ -1652,6 +1613,87 @@ func (cc *XController) addQueueJob(obj interface{}) { qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj.CreationTimestamp, qj.Status.ControllerFirstTimestamp) klog.V(6).Infof("[Informer-addQJ] enqueue %s &qj=%p Version=%s Status=%+v", qj.Name, qj, qj.ResourceVersion, qj.Status) + + // Requeue the item to be processed again in 30 seconds. + //TODO: tune the frequency of reprocessing an AW + hasCompletionStatus := false + for _, genericItem := range qj.Spec.AggrResources.GenericItems { + if len(genericItem.CompletionStatus) > 0 { + hasCompletionStatus = true + } + } + //When an AW entrs a system with completionstatus keep checking the AW until completed + //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate + //on stale AWs. This has potential to improve performance at scale. + if hasCompletionStatus { + requeueInterval := 5 * time.Second + key, err := cache.MetaNamespaceKeyFunc(qj) + if err != nil { + klog.Warningf("[Informer-addQJ] Error getting AW %s from cache cannot determine completion status", qj.Name) + //TODO: should we return from this loop? + } + go func() { + for { + time.Sleep(requeueInterval) + latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + if err != nil && !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s not found in cache", qj.Name) + } else { + var latestAw *arbv1.AppWrapper + if latestObj != nil { + latestAw = latestObj.(*arbv1.AppWrapper) + } else { + latestAw = qj + } + if latestAw.Status.State != arbv1.AppWrapperStateActive && latestAw.Status.State != arbv1.AppWrapperStateEnqueued && latestAw.Status.State != arbv1.AppWrapperStateRunningHoldCompletion { + klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.Name, latestAw.Status.State) + break //Exit the loop + } + // Enqueue the latest copy of the AW. + if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { + cc.UpdateQueueJobs(latestAw) + klog.V(2).Infof("[Informer-addQJ] requeing AW to determine completion status for AW", qj.Name) + } + + } + + } + }() + } + + if qj.Spec.SchedSpec.MinAvailable > 0 { + requeueInterval := 60 * time.Second + key, err := cache.MetaNamespaceKeyFunc(qj) + if err != nil { + klog.Errorf("[Informer-addQJ] Error getting AW %s from cache cannot preempt AW", qj.Name) + //TODO: should we return from this loop? + } + go func() { + for { + time.Sleep(requeueInterval) + latestObj, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key) + if err != nil && !exists { + klog.Warningf("[Informer-addQJ] Recent copy of AW %s not found in cache", qj.Name) + } else { + var latestAw *arbv1.AppWrapper + if latestObj != nil { + latestAw = latestObj.(*arbv1.AppWrapper) + } else { + latestAw = qj + } + if latestAw.Status.State != arbv1.AppWrapperStateActive && latestAw.Status.State != arbv1.AppWrapperStateEnqueued && latestAw.Status.State != arbv1.AppWrapperStateRunningHoldCompletion { + klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.Name, latestAw.Status.State) + break //Exit the loop + } + // Enqueue the latest copy of the AW. + if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && (qj.Spec.SchedSpec.MinAvailable > 0) { + cc.PreemptQueueJobs(latestAw) + klog.V(2).Infof("[Informer-addQJ] requeing AW to check minScheduling spec for AW", qj.Name) + } + } + } + }() + } cc.enqueue(qj) } @@ -1678,6 +1720,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { } klog.V(6).Infof("[Informer-updateQJ] '%s/%s' *Delay=%.6f seconds normal enqueue Version=%s Status=%v", newQJ.Namespace, newQJ.Name, time.Now().Sub(newQJ.Status.ControllerFirstTimestamp.Time).Seconds(), newQJ.ResourceVersion, newQJ.Status) + notBackedoff := true for _, cond := range newQJ.Status.Conditions { if cond.Type == arbv1.AppWrapperCondBackoff { //AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue. @@ -1688,12 +1731,15 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { } cc.enqueue(newQJ) }) - return + notBackedoff = false } } // cc.eventQueue.Delete(oldObj) - cc.enqueue(newQJ) + if notBackedoff { + cc.enqueue(newQJ) + } + } // a, b arbitrary length numerical string. returns true if a larger than b diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 2809f721..2fd6f1b3 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -141,12 +141,12 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createJobAWWithInitContainer(context, "aw-job-3-init-container-1", 60, "exponential", 0) appwrappers = append(appwrappers, aw) - err := waitAWPodsCompleted(context, aw, 12*time.Minute) // This test waits for 12 minutes to make sure all PODs complete + err := waitAWPodsCompleted(context, aw, 14*time.Minute) // This test waits for 14 minutes to make sure all PODs complete Expect(err).NotTo(HaveOccurred(), "Waiting for the pods to be completed") }) It("MCAD CPU Requeuing - Deletion After Maximum Requeuing Times Test", func() { - fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing Test - Started.\n") + fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing - Deletion After Maximum Requeuing Times Test - Started.\n") context := initTestContext() var appwrappers []*arbv1.AppWrapper @@ -513,7 +513,7 @@ var _ = Describe("AppWrapper E2E Test", func() { defer cleanupTestObjectsPtr(context, appwrappersPtr) // This should fill up the worker node and most of the master node - aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu")) + aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu-2")) appwrappers = append(appwrappers, aw) err := waitAWPodsReady(context, aw)