diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 355627d47..e57a8ea22 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2081,53 +2081,43 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( for (CranedId craned_id : craned_indexes_) { auto& time_avail_res_map = node_selection_info.node_time_avail_res_map.at(craned_id); - auto it = time_avail_res_map.begin(); - trackers.emplace_back(craned_id, it, time_avail_res_map.end(), + trackers.emplace_back(craned_id, time_avail_res_map.begin(), + time_avail_res_map.end(), &task->Resources().at(craned_id)); pq.emplace(&trackers.back()); } - int satisfied_count = 0; - absl::Time last_time = absl::InfinitePast(); + TrackerList satisfied_trackers(task->node_num); + while (!pq.empty()) { absl::Time time = pq.top()->it->first; if (time - now > kAlgoMaxTimeWindow) { return false; } while (!pq.empty() && pq.top()->it->first == time) { - auto tmp = pq.top(); + auto tracker = pq.top(); pq.pop(); - satisfied_count -= tmp->satisfied; - if (tmp->genNext()) pq.emplace(tmp); - satisfied_count += tmp->satisfied; - } - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE("At time now+{}s, {} nodes are satisfied.", - (time - now) / absl::Seconds(1), satisfied_count); - for (auto& tracker : trackers) { - if (tracker.satisfied) { - CRANE_TRACE("Craned {} is satisfied.", tracker.craned_id); - } + if (tracker->satisfied()) { + satisfied_trackers.try_push_back(tracker, time); + } else { + satisfied_trackers.try_erase(tracker); } - } - if (satisfied_count < task->node_num) { - last_time = absl::InfinitePast(); - } else { - if (last_time == absl::InfinitePast()) { - last_time = time; + if (tracker->genNext()) { + pq.emplace(tracker); } - if (time - last_time >= task->time_limit || pq.empty()) { - *start_time = last_time; - craned_ids->clear(); - for (auto& tracker : trackers) { - if (tracker.satisfied) { - craned_ids->emplace_back(tracker.craned_id); - if (craned_ids->size() >= task->node_num) break; - } - } - CRANE_ASSERT(craned_ids->size() == task->node_num); - return true; + } + if (pq.empty() || satisfied_trackers.kth_time() + task->time_limit <= + pq.top()->it->first) { + *start_time = satisfied_trackers.kth_time(); + craned_ids->clear(); + auto it = satisfied_trackers.kth_elem; + while (it != nullptr) { + craned_ids->emplace_back(it->tracker_ptr->craned_id); + it = it->prev; } + CRANE_ASSERT(*start_time != absl::InfiniteFuture()); + CRANE_ASSERT(craned_ids->size() == task->node_num); + return true; } } diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 1a19a08b7..fb5c88e7e 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -164,7 +164,7 @@ class MinLoadFirst : public INodeSelectionAlgo { TimeAvailResMap::const_iterator it; const TimeAvailResMap::const_iterator end; const ResourceInNode* task_res; - bool satisfied; + void* tracker_list_elem; TimeAvailResTracker(const CranedId& craned_id, const TimeAvailResMap::const_iterator& begin, @@ -174,11 +174,71 @@ class MinLoadFirst : public INodeSelectionAlgo { it(begin), end(end), task_res(task_res), - satisfied(false) {} + tracker_list_elem(nullptr) {} - bool genNext() { - satisfied = *task_res <= it->second; - return ++it != end; + bool satisfied() const { return *task_res <= it->second; } + + bool genNext() { return ++it != end; } + }; + + struct TrackerList { + struct TrackerListElem { + TimeAvailResTracker* tracker_ptr; + absl::Time time; + TrackerListElem* prev; + TrackerListElem* next; + bool first_k; + }; + + size_t size; + int node_num; + TrackerListElem* tail; + TrackerListElem* kth_elem; + + TrackerList(int node_num) + : size(0), node_num(node_num), tail(nullptr), kth_elem(nullptr) {} + + void try_push_back(TimeAvailResTracker* it, absl::Time time) { + if (it->tracker_list_elem) return; + TrackerListElem* elem = + new TrackerListElem{it, time, nullptr, nullptr, ++size <= node_num}; + if (tail) { + elem->prev = tail; + tail->next = elem; + } + if (size == node_num) { + kth_elem = elem; + } + tail = elem; + it->tracker_list_elem = elem; + } + + void try_erase(TimeAvailResTracker* it) { + TrackerListElem* elem = + static_cast(it->tracker_list_elem); + if (!elem) return; + if (elem->first_k && kth_elem) { + kth_elem = kth_elem->next; + if (kth_elem) { + kth_elem->first_k = true; + } + } + if (elem->prev) { + elem->prev->next = elem->next; + } + if (elem->next) { + elem->next->prev = elem->prev; + } + if (elem == tail) { + tail = elem->prev; + } + elem->tracker_ptr->tracker_list_elem = nullptr; + delete elem; + --size; + } + + absl::Time kth_time() const { + return kth_elem ? kth_elem->time : absl::InfiniteFuture(); } };