diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 0954f676..ab84e8c0 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -16,11 +16,6 @@ #include "TaskScheduler.h" -#include -#include - -#include - #include "AccountManager.h" #include "CranedKeeper.h" #include "CranedMetaContainer.h" @@ -1858,10 +1853,6 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( // Sort all running task in this node by ending time. std::vector> end_time_task_id_vec; - node_selection_info_ref.task_num_node_id_map.emplace( - craned_meta->running_task_resource_map.size(), craned_id); - - std::vector running_task_ids_str; for (const auto& [task_id, res] : craned_meta->running_task_resource_map) { const auto& task = running_tasks.at(task_id); @@ -1875,13 +1866,14 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( absl::Time end_time = std::max(task->StartTime() + task->time_limit, now + absl::Seconds(1)); end_time_task_id_vec.emplace_back(end_time, task_id); - - running_task_ids_str.emplace_back(std::to_string(task_id)); } if constexpr (kAlgoTraceOutput) { + std::string running_task_ids_str; + for (const auto& [end_time, task_id] : end_time_task_id_vec) + running_task_ids_str.append(fmt::format("{}, ", task_id)); CRANE_TRACE("Craned node {} has running tasks: {}", craned_id, - absl::StrJoin(running_task_ids_str, ", ")); + running_task_ids_str); } std::sort( @@ -1902,9 +1894,12 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( } // Calculate how many resources are available at [now, first task end, - // second task end, ...] in this node. + // second task end, ...] in this node. auto& time_avail_res_map = node_selection_info_ref.node_time_avail_res_map[craned_id]; + node_selection_info_ref.node_res_total_map[craned_id] = + craned_meta->res_total; + node_selection_info_ref.setCost(craned_id, 0); // Insert [now, inf) interval and thus guarantee time_avail_res_map is not // null. @@ -1918,11 +1913,15 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( } { // Limit the scope of `iter` - auto cur_time_iter = time_avail_res_map.find(now); + auto cur_time_iter = time_avail_res_map.begin(); bool ok; for (auto& [end_time, task_id] : end_time_task_id_vec) { const auto& running_task = running_tasks.at(task_id); - if (!time_avail_res_map.contains(end_time)) { + ResourceInNode const& running_task_res = + running_task->Resources().at(craned_id); + node_selection_info_ref.updateCost(craned_id, now, end_time, + running_task_res); + if (cur_time_iter->first != end_time) { /** * If there isn't any task that ends at the `end_time`, * insert an interval [end_time, inf) with the resource of @@ -1953,7 +1952,7 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( * {{now+1+1: available_res(now) + available_res(1) + * available_res(2)}, ...} */ - cur_time_iter->second += running_task->Resources().at(craned_id); + cur_time_iter->second += running_task_res; if constexpr (kAlgoTraceOutput) { CRANE_TRACE( @@ -1999,27 +1998,32 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( const CranedMetaContainer::CranedMetaRawMap& craned_meta_map, TaskInCtld* task, absl::Time now, std::list* craned_ids, absl::Time* start_time) { - uint32_t selected_node_cnt = 0; - std::vector intersected_time_segments; - bool first_pass{true}; - - std::list craned_indexes_; + uint32_t node_num_limit = task->node_num; + if constexpr (kAlgoRedundantNode) { + node_num_limit = std::min(task->node_num + 10, task->node_num * 2); + } else { + node_num_limit = task->node_num; + } + std::vector craned_indexes_; - auto task_num_node_id_it = node_selection_info.task_num_node_id_map.begin(); - while (selected_node_cnt < task->node_num && - task_num_node_id_it != - node_selection_info.task_num_node_id_map.end()) { - auto craned_index = task_num_node_id_it->second; + for (auto& [cost, craned_index] : node_selection_info.cost_node_id_set) { if (!partition_meta_ptr.GetExclusivePtr()->craned_ids.contains( craned_index)) { // Todo: Performance issue! We can use cached available node set // for the task when checking task validity in TaskScheduler. - ++task_num_node_id_it; continue; } - auto& time_avail_res_map = node_selection_info.node_time_avail_res_map.at(craned_index); + // Number of tasks is not less than map size. + // When condition is true, the craned has too many tasks. + if (time_avail_res_map.size() >= kAlgoMaxTaskNumPerNode) { + if constexpr (kAlgoTraceOutput) { + CRANE_TRACE("Craned {} has too many tasks. Skipping this craned.", + craned_index); + } + continue; + } auto craned_meta = craned_meta_map.at(craned_index).GetExclusivePtr(); // If any of the follow `if` is true, skip this node. @@ -2046,14 +2050,11 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( } } else { craned_indexes_.emplace_back(craned_index); - ++selected_node_cnt; + if (craned_indexes_.size() >= node_num_limit) break; } - ++task_num_node_id_it; } - if (selected_node_cnt < task->node_num) return false; - CRANE_ASSERT_MSG(selected_node_cnt == task->node_num, - "selected_node_cnt != task->node_num"); + if (craned_indexes_.size() < task->node_num) return false; ResourceV2 allocated_res; task->allocated_res_view.SetToZero(); @@ -2078,280 +2079,54 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( task->SetResources(std::move(allocated_res)); - for (CranedId craned_id : craned_indexes_) { - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE("Find valid time segments for task #{} on craned {}", - task->TaskId(), craned_id); - } + std::vector trackers; + std::priority_queue, + std::function> + pq([](const TimeAvailResTracker* lhs, const TimeAvailResTracker* rhs) { + return lhs->it->first > rhs->it->first; + }); + trackers.reserve(craned_indexes_.size()); + for (CranedId craned_id : craned_indexes_) { auto& time_avail_res_map = node_selection_info.node_time_avail_res_map.at(craned_id); + trackers.emplace_back(craned_id, time_avail_res_map.begin(), + time_avail_res_map.end(), + &task->Resources().at(craned_id)); + pq.emplace(&trackers.back()); + } - // Find all valid time segments in this node for this task. - // The expected start time must exist because all tasks in - // pending_task_map can be run under the amount of all resources in this - // node. At some future time point, all tasks will end and this pending - // task can eventually be run because the total resource of all the nodes - // in `craned_indexes` >= the resource required by the task. - auto res_it = time_avail_res_map.begin(); - - std::vector time_segments; - absl::Duration valid_duration; - absl::Time expected_start_time; + TrackerList satisfied_trackers(task->node_num); - ResourceInNode const& task_node_res = - task->Resources().EachNodeResMap().at(craned_id); - - // Figure out in this craned node, which time segments have sufficient - // resource to run the task. - // For example, if task needs 3 cpu cores, time_avail_res_map is: - // [1, 3], [5, 6], [7, 2], [9, 6] | Format: [start_time, cores] - // Then the valid time segments are: - // [1,6], [9, inf] | Format: [start_time, duration] - bool trying = true; - while (true) { - CRANE_ASSERT(res_it != time_avail_res_map.end()); - if (trying) { - if (task_node_res <= res_it->second) { - trying = false; - expected_start_time = res_it->first; - - if (std::next(res_it) == time_avail_res_map.end()) { - valid_duration = absl::InfiniteDuration(); - time_segments.emplace_back(expected_start_time, valid_duration); - break; - } else { - valid_duration = std::next(res_it)->first - res_it->first; - res_it++; - continue; - } - } else { - if (++res_it == time_avail_res_map.end()) break; - continue; - } - } else { - if (task_node_res <= res_it->second) { - if (std::next(res_it) == time_avail_res_map.end()) { - valid_duration = absl::InfiniteDuration(); - time_segments.emplace_back(expected_start_time, valid_duration); - break; - } else { - valid_duration += std::next(res_it)->first - res_it->first; - res_it++; - continue; - } - } else { - trying = true; - time_segments.emplace_back(expected_start_time, valid_duration); - if (++res_it == time_avail_res_map.end()) - break; - else - continue; - } - } + while (!pq.empty()) { + absl::Time time = pq.top()->it->first; + if (time - now > kAlgoMaxTimeWindow) { + return false; } - - // Now we have the valid time segments for this node. Find the - // intersection with the set in the previous pass. - if (first_pass) { - intersected_time_segments = std::move(time_segments); - first_pass = false; - - if constexpr (kAlgoTraceOutput) { - std::vector valid_seg_str; - for (auto& seg : intersected_time_segments) - valid_seg_str.emplace_back(fmt::format( - "[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now), - absl::ToInt64Seconds(seg.start - now + seg.duration))); - CRANE_TRACE("After looping craned {}, valid time segments: {}", - craned_id, absl::StrJoin(valid_seg_str, ", ")); - } - } else { - std::vector new_intersected_time_segments; - - for (auto&& seg : time_segments) { - absl::Time start = seg.start; - absl::Time end = seg.start + seg.duration; - - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE( - "Trying to intersect time segment: [start: {}, end: " - "{})", - absl::ToInt64Seconds(start - now), - absl::ToInt64Seconds(start - now + seg.duration)); - } - - // Segment: [start, end) - // e.g. segment.start=5, segment.duration=1s => [5,6) - - // Find the first time point that >= seg.start + seg.duration - auto it2 = std::lower_bound(intersected_time_segments.begin(), - intersected_time_segments.end(), end); - - if (it2 == intersected_time_segments.begin()) { - // If it2 == intersected_time_segments.begin(), - // this time segment has no overlap with any time segment - // in intersected_time_segments. - // Just skip it. - // it2 - // V - // seg *------* *----* .... intersected_time_segments - // *----------* - // ^ - // end - // - // Do nothing under such situation. - continue; - } else { - it2 = std::prev(it2); - // it2 now looks like - // *-----------* seg - // (.......) *--------* *---* intersected_time_segments - // ^ - // it2 (the last time segment to do intersection) - - // Find the first segment in `intersected_time_segments` - // whose end >= seg.start. - // Note: If end == seg.start, there's no intersection. - // - // We first find a segment (it1) in `intersected_time_segments` - // whose start < seg.start and is closet to seg.start... - // There may be 2 situation: - // A1. - // start - // V - // *-------* seg - // *--------* *----* - // ^ - // it1 ( != intersected_time_segment.end() ) - // - // A2. - // *-------* seg *-------* seg - // *--* or *---* - // ^ ^ - // it1 it1 ( == intersected_time_segment.begin()) - auto it1 = std::lower_bound(intersected_time_segments.begin(), - intersected_time_segments.end(), start); - if (it1 == intersected_time_segments.begin()) { - // Case A2: - // - // If it1 == intersected_time_segments.begin(), there is no time - // segment that starts previous to seg.start but there are some - // segments immediately after seg.start. The first one of them is - // the beginning of intersected_time_segments. - // it1 == begin() it2 - // V V - // *---* *----* - // *--------------------* seg - // ^ - // start - } else { - // Case A1: - // If there is an overlap between first segment and `seg`, take - // the intersection. - - // Case A1-1 (end >= it1->start): - // - // std::prev(it1) it1 - // V V - // *----------------------------* *--------* - // *-----------------------------* - // |<-intersected part->| ^ - // ^ | - // start end - // - // OR - // - // Case A1-2 (end < it1->start): - // it0 == std::prev(it1) it1 - // V V - // *--------------------------------* *--------* - // *--------------------* - // |<-intersected part->| - // ^ ^ - // start end - auto it0 = std::prev(it1); - if (it0->start + it0->duration > start) { - // Note: If it0->start + it0->duration == seg.start, - // there's no intersection. - - absl::Duration intersected_duration; - if (end < it0->start + it0->duration) - // Case A1-2 - intersected_duration = end - start; - else - // Case A1-1 - intersected_duration = it0->start + it0->duration - start; - - new_intersected_time_segments.emplace_back(start, - intersected_duration); - } - } - - // |<-- intersected range -->| - // it1 it2 - // v v - // *~~~~~~~* *----* *--------* *~~~~~~~~~~~~* - // *----------------------------------------* - // - // - // Case A1-3 (There's no half-intersected tail segment): - // it2 it1 - // v v - // *-------------* *--------------* - // *----------* - // - // Or - // - // it2 it1 == end() - // v v - // *-------------* - // *----------* - // - // Note: In case A1-3, it2 < it1. - // Thus, termination condition should be (it2 < it1). - for (auto it = it1; it < it2; ++it) - new_intersected_time_segments.emplace_back(it->start, it->duration); - - if (it2 < it1) { - // Case A1-3. - // No half-intersected tail segment should be handled. - } else { - // the last insertion handles the following 2 situations. - // it2 - // *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------------* - // *--------------------------------* - // OR - // it2 - // *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------* - // *-------------------------------------* - new_intersected_time_segments.emplace_back( - it2->start, std::min(it2->duration, end - it2->start)); - } - } + while (!pq.empty() && pq.top()->it->first == time) { + auto tracker = pq.top(); + pq.pop(); + if (tracker->satisfied()) { + satisfied_trackers.try_push_back(tracker, time); + } else { + satisfied_trackers.try_erase(tracker); } - - intersected_time_segments = std::move(new_intersected_time_segments); - - if constexpr (kAlgoTraceOutput) { - std::vector valid_seg_str; - for (auto& seg : intersected_time_segments) { - valid_seg_str.emplace_back(fmt::format( - "[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now), - absl::ToInt64Seconds(seg.start - now + seg.duration))); - } - CRANE_TRACE("After looping craned {}, valid time segments: {}", - craned_id, absl::StrJoin(valid_seg_str, ", ")); + if (tracker->genNext()) { + pq.emplace(tracker); } } - } - - *craned_ids = std::move(craned_indexes_); - - // Calculate the earliest start time - for (auto&& seg : intersected_time_segments) { - if (task->time_limit <= seg.duration) { - *start_time = seg.start; + if (pq.empty() || satisfied_trackers.kth_time() + task->time_limit <= + pq.top()->it->first) { + *start_time = satisfied_trackers.kth_time(); + craned_ids->clear(); + auto it = satisfied_trackers.kth_elem; + while (it != nullptr) { + craned_ids->emplace_back(it->tracker_ptr->craned_id); + it = it->prev; + } + CRANE_ASSERT(*start_time != absl::InfiniteFuture()); + CRANE_ASSERT(craned_ids->size() == task->node_num); return true; } } @@ -2391,7 +2166,7 @@ void MinLoadFirst::NodeSelect( std::vector task_id_vec; task_id_vec = m_priority_sorter_->GetOrderedTaskIdList( - *pending_task_map, running_tasks, g_config.ScheduledBatchSize); + *pending_task_map, running_tasks, g_config.ScheduledBatchSize, now); // Now we know, on every node, the # of running tasks (which // doesn't include those we select as the incoming running tasks in the // following code) and how many resources are available at the end of each @@ -2500,24 +2275,16 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_( NodeSelectionInfo& node_info = *node_selection_info; bool ok; - for (CranedId const& craned_id : craned_ids) { - // Increase the running task num in Craned `crane_id`. - for (auto it = node_info.task_num_node_id_map.begin(); - it != node_info.task_num_node_id_map.end(); ++it) { - if (it->second == craned_id) { - uint32_t num_task = it->first + 1; - node_info.task_num_node_id_map.erase(it); - node_info.task_num_node_id_map.emplace(num_task, craned_id); - break; - } - } + absl::Time task_end_time = expected_start_time + duration; + // Increase the running task num in Craned `crane_id`. + for (CranedId craned_id : craned_ids) { ResourceInNode const& task_res_in_node = resources.at(craned_id); + node_info.updateCost(craned_id, expected_start_time, task_end_time, + task_res_in_node); TimeAvailResMap& time_avail_res_map = node_info.node_time_avail_res_map[craned_id]; - absl::Time task_end_time = expected_start_time + duration; - auto task_duration_begin_it = time_avail_res_map.upper_bound(expected_start_time); if (task_duration_begin_it == time_avail_res_map.end()) { @@ -2831,8 +2598,8 @@ void TaskScheduler::TerminateTasksOnCraned(const CranedId& craned_id, std::vector MultiFactorPriority::GetOrderedTaskIdList( const OrderedTaskMap& pending_task_map, - const UnorderedTaskMap& running_task_map, size_t limit_num) { - absl::Time now = absl::Now(); + const UnorderedTaskMap& running_task_map, size_t limit_num, + absl::Time now) { CalculateFactorBound_(pending_task_map, running_task_map, now); std::vector> task_priority_vec; diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 473580b2..595ba3b7 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -20,8 +20,6 @@ // Precompiled header comes first! #include "CranedMetaContainer.h" -#include "DbClient.h" -#include "crane/Lock.h" #include "protos/Crane.pb.h" namespace Ctld { @@ -35,7 +33,8 @@ class IPrioritySorter { public: virtual std::vector GetOrderedTaskIdList( const OrderedTaskMap& pending_task_map, - const UnorderedTaskMap& running_task_map, size_t limit) = 0; + const UnorderedTaskMap& running_task_map, size_t limit, + absl::Time now) = 0; virtual ~IPrioritySorter() = default; }; @@ -44,7 +43,8 @@ class BasicPriority : public IPrioritySorter { public: std::vector GetOrderedTaskIdList( const OrderedTaskMap& pending_task_map, - const UnorderedTaskMap& running_task_map, size_t limit) override { + const UnorderedTaskMap& running_task_map, size_t limit, + absl::Time now) override { size_t len = std::min(pending_task_map.size(), limit); std::vector task_id_vec; @@ -69,7 +69,8 @@ class MultiFactorPriority : public IPrioritySorter { public: std::vector GetOrderedTaskIdList( const OrderedTaskMap& pending_task_map, - const UnorderedTaskMap& running_task_map, size_t limit_num) override; + const UnorderedTaskMap& running_task_map, size_t limit_num, + absl::Time now) override; private: struct FactorBound { @@ -138,6 +139,9 @@ class MinLoadFirst : public INodeSelectionAlgo { private: static constexpr bool kAlgoTraceOutput = false; + static constexpr bool kAlgoRedundantNode = true; + static constexpr uint32_t kAlgoMaxTaskNumPerNode = 1000; + static constexpr absl::Duration kAlgoMaxTimeWindow = absl::Hours(24 * 7); /** * This map stores how much resource is available @@ -150,24 +154,113 @@ class MinLoadFirst : public INodeSelectionAlgo { * In time interval [z, ...], the amount of available resources is c. */ using TimeAvailResMap = std::map; + struct TimeAvailResTracker { + const CranedId craned_id; + TimeAvailResMap::const_iterator it; + const TimeAvailResMap::const_iterator end; + const ResourceInNode* task_res; + void* tracker_list_elem; + + TimeAvailResTracker(const CranedId& craned_id, + const TimeAvailResMap::const_iterator& begin, + const TimeAvailResMap::const_iterator& end, + const ResourceInNode* task_res) + : craned_id(craned_id), + it(begin), + end(end), + task_res(task_res), + tracker_list_elem(nullptr) {} + + bool satisfied() const { return *task_res <= it->second; } + + bool genNext() { return ++it != end; } + }; - struct TimeSegment { - TimeSegment(absl::Time start, absl::Duration duration) - : start(start), duration(duration) {} - absl::Time start; - absl::Duration duration; + struct TrackerList { + struct TrackerListElem { + TimeAvailResTracker* tracker_ptr; + absl::Time time; + TrackerListElem* prev; + TrackerListElem* next; + bool first_k; + }; + + size_t size; + int node_num; + TrackerListElem* tail; + TrackerListElem* kth_elem; + + TrackerList(int node_num) + : size(0), node_num(node_num), tail(nullptr), kth_elem(nullptr) {} + + void try_push_back(TimeAvailResTracker* it, absl::Time time) { + if (it->tracker_list_elem) return; + TrackerListElem* elem = + new TrackerListElem{it, time, nullptr, nullptr, ++size <= node_num}; + if (tail) { + elem->prev = tail; + tail->next = elem; + } + if (size == node_num) { + kth_elem = elem; + } + tail = elem; + it->tracker_list_elem = elem; + } - bool operator<(const absl::Time& rhs) const { return this->start < rhs; } + void try_erase(TimeAvailResTracker* it) { + TrackerListElem* elem = + static_cast(it->tracker_list_elem); + if (!elem) return; + if (elem->first_k && kth_elem) { + kth_elem = kth_elem->next; + if (kth_elem) { + kth_elem->first_k = true; + } + } + if (elem->prev) { + elem->prev->next = elem->next; + } + if (elem->next) { + elem->next->prev = elem->prev; + } + if (elem == tail) { + tail = elem->prev; + } + elem->tracker_ptr->tracker_list_elem = nullptr; + delete elem; + --size; + } - friend bool operator<(const absl::Time& lhs, const TimeSegment& rhs) { - return lhs < rhs.start; + absl::Time kth_time() const { + return kth_elem ? kth_elem->time : absl::InfiniteFuture(); } }; struct NodeSelectionInfo { - std::multimap - task_num_node_id_map; + // Craned_ids are sorted by cost. + std::set> cost_node_id_set; + std::unordered_map node_cost_map; std::unordered_map node_time_avail_res_map; + std::unordered_map node_res_total_map; + + void setCost(const CranedId& craned_id, uint64_t cost) { + cost_node_id_set.erase({node_cost_map[craned_id], craned_id}); + node_cost_map[craned_id] = cost; + cost_node_id_set.emplace(cost, craned_id); + } + void updateCost(const CranedId& craned_id, const absl::Time& start_time, + const absl::Time& end_time, + const ResourceInNode& resources) { + auto& cost = node_cost_map[craned_id]; + cost_node_id_set.erase({cost, craned_id}); + auto& total_res = node_res_total_map[craned_id]; + double cpu_rate = + static_cast(resources.allocatable_res.cpu_count) / + static_cast(total_res.allocatable_res.cpu_count); + cost += round((end_time - start_time) / absl::Seconds(1) * cpu_rate); + cost_node_id_set.emplace(cost, craned_id); + } }; static void CalculateNodeSelectionInfoOfPartition_( @@ -178,8 +271,6 @@ class MinLoadFirst : public INodeSelectionAlgo { const CranedMetaContainer::CranedMetaRawMap& craned_meta_map, NodeSelectionInfo* node_selection_info); - // Input should guarantee that provided nodes in `node_selection_info` has - // enough nodes whose resource is >= task->resource. static bool CalculateRunningNodesAndStartTime_( const NodeSelectionInfo& node_selection_info, const util::Synchronized& partition_meta_ptr,