From 81b7243b7c11a69cad626862365b1d410a3ac08e Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Wed, 18 Sep 2024 22:23:36 +0800 Subject: [PATCH 01/10] enhance log --- src/CraneCtld/TaskScheduler.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 0954f676..b6f870b5 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2159,6 +2159,10 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( if constexpr (kAlgoTraceOutput) { std::vector valid_seg_str; + for (auto& seg : intersected_time_segments) + valid_seg_str.emplace_back(fmt::format( + "[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now), + absl::ToInt64Seconds(seg.start - now + seg.duration))); for (auto& seg : intersected_time_segments) valid_seg_str.emplace_back(fmt::format( "[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now), From ee3cc3ae7dd7c562b6ee6f4a46326db18bcd6f45 Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Mon, 12 Aug 2024 10:10:00 +0800 Subject: [PATCH 02/10] Optimize scheduling algorithm. --- src/CraneCtld/TaskScheduler.cpp | 608 +++--------------- src/CraneCtld/TaskScheduler.h | 74 ++- .../PublicHeader/include/crane/PublicHeader.h | 2 + 3 files changed, 155 insertions(+), 529 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index b6f870b5..8021bc9d 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -1858,9 +1858,6 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( // Sort all running task in this node by ending time. std::vector> end_time_task_id_vec; - node_selection_info_ref.task_num_node_id_map.emplace( - craned_meta->running_task_resource_map.size(), craned_id); - std::vector running_task_ids_str; for (const auto& [task_id, res] : craned_meta->running_task_resource_map) { const auto& task = running_tasks.at(task_id); @@ -1882,13 +1879,6 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( if constexpr (kAlgoTraceOutput) { CRANE_TRACE("Craned node {} has running tasks: {}", craned_id, absl::StrJoin(running_task_ids_str, ", ")); - } - - std::sort( - end_time_task_id_vec.begin(), end_time_task_id_vec.end(), - [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); - - if constexpr (kAlgoTraceOutput) { if (!end_time_task_id_vec.empty()) { std::string str; str.append( @@ -1901,94 +1891,32 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( } } - // Calculate how many resources are available at [now, first task end, + // Calculate delta resources at [now, first task end, // second task end, ...] in this node. - auto& time_avail_res_map = - node_selection_info_ref.node_time_avail_res_map[craned_id]; + auto& time_delta_res_map = + node_selection_info_ref.node_time_delta_res_map[craned_id]; - // Insert [now, inf) interval and thus guarantee time_avail_res_map is not - // null. - time_avail_res_map[now] = craned_meta->res_avail; + time_delta_res_map[now] = craned_meta->res_avail; - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE("Craned {} initial res_avail now: cpu: {}, mem: {}, gres: {}", - craned_id, craned_meta->res_avail.allocatable_res.cpu_count, - craned_meta->res_avail.allocatable_res.memory_bytes, - util::ReadableDresInNode(craned_meta->res_avail)); + for (auto& [end_time, task_id] : end_time_task_id_vec) { + time_delta_res_map[end_time] += + running_tasks.at(task_id)->Resources().at(craned_id); } - { // Limit the scope of `iter` - auto cur_time_iter = time_avail_res_map.find(now); - bool ok; - for (auto& [end_time, task_id] : end_time_task_id_vec) { - const auto& running_task = running_tasks.at(task_id); - if (!time_avail_res_map.contains(end_time)) { - /** - * If there isn't any task that ends at the `end_time`, - * insert an interval [end_time, inf) with the resource of - * the previous interval for the following addition of - * freed resources. - * Note: Such two intervals [5,6), [6,inf) do not overlap with - * each other. - */ - std::tie(cur_time_iter, ok) = - time_avail_res_map.emplace(end_time, cur_time_iter->second); - - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE( - "Insert duration [now+{}s, inf) with resource: " - "cpu: {}, mem: {}, gres: {}", - absl::ToInt64Seconds(end_time - now), - craned_meta->res_avail.allocatable_res.cpu_count, - craned_meta->res_avail.allocatable_res.memory_bytes, - util::ReadableDresInNode(craned_meta->res_avail)); - } - } - - /** - * For the situation in which multiple tasks may end at the same - * time: - * end_time__task_id_vec: [{now+1, 1}, {now+1, 2}, ...] - * But we want only 1 time point in time__avail_res__map: - * {{now+1+1: available_res(now) + available_res(1) + - * available_res(2)}, ...} - */ - cur_time_iter->second += running_task->Resources().at(craned_id); - - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE( - "Craned {} res_avail at now + {}s: cpu: {}, mem: {}, gres: {}; ", - craned_id, absl::ToInt64Seconds(cur_time_iter->first - now), - cur_time_iter->second.allocatable_res.cpu_count, - cur_time_iter->second.allocatable_res.memory_bytes, - util::ReadableDresInNode(cur_time_iter->second)); - } - } + node_selection_info_ref.setCost( + craned_id, craned_meta->running_task_resource_map.size()); - if constexpr (kAlgoTraceOutput) { - std::string str; - str.append(fmt::format("Node ({}, {}): ", partition_id, craned_id)); - auto prev_iter = time_avail_res_map.begin(); - auto iter = std::next(prev_iter); - for (; iter != time_avail_res_map.end(); prev_iter++, iter++) { - str.append( - fmt::format("[ now+{}s , now+{}s ) Available allocatable " - "res: cpu core {}, mem {}, gres {}", - absl::ToInt64Seconds(prev_iter->first - now), - absl::ToInt64Seconds(iter->first - now), - prev_iter->second.allocatable_res.cpu_count, - prev_iter->second.allocatable_res.memory_bytes, - util::ReadableDresInNode(prev_iter->second))); - } - str.append( - fmt::format("[ now+{}s , inf ) Available allocatable " - "res: cpu core {}, mem {}, gres {}", - absl::ToInt64Seconds(prev_iter->first - now), - prev_iter->second.allocatable_res.cpu_count, - prev_iter->second.allocatable_res.memory_bytes, - util::ReadableDresInNode(prev_iter->second))); - CRANE_TRACE("{}", str); + if constexpr (kAlgoTraceOutput) { + std::string str; + ResourceInNode cur_res; + for (auto& [time, res] : time_delta_res_map) { + cur_res += res; + str.append(fmt::format( + "Craned {} res_avail at now+{}s: cpu: {}, mem: {}\n", craned_id, + absl::ToInt64Seconds(time - now), cur_res.allocatable_res.cpu_count, + cur_res.allocatable_res.memory_bytes)); } + CRANE_TRACE("{}", str); } } } @@ -1999,27 +1927,23 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( const CranedMetaContainer::CranedMetaRawMap& craned_meta_map, TaskInCtld* task, absl::Time now, std::list* craned_ids, absl::Time* start_time) { - uint32_t selected_node_cnt = 0; - std::vector intersected_time_segments; - bool first_pass{true}; - - std::list craned_indexes_; + uint32_t node_num_limit = task->node_num; + if constexpr (kAlgoRedundantNode) { + node_num_limit = std::min(task->node_num + 10, task->node_num * 2); + } else { + node_num_limit = task->node_num; + } + std::vector craned_indexes_; - auto task_num_node_id_it = node_selection_info.task_num_node_id_map.begin(); - while (selected_node_cnt < task->node_num && - task_num_node_id_it != - node_selection_info.task_num_node_id_map.end()) { - auto craned_index = task_num_node_id_it->second; + for (auto& [cost, craned_index] : node_selection_info.cost_node_id_set) { if (!partition_meta_ptr.GetExclusivePtr()->craned_ids.contains( craned_index)) { // Todo: Performance issue! We can use cached available node set // for the task when checking task validity in TaskScheduler. - ++task_num_node_id_it; continue; } - - auto& time_avail_res_map = - node_selection_info.node_time_avail_res_map.at(craned_index); + auto& time_delta_res_map = + node_selection_info.node_time_delta_res_map.at(craned_index); auto craned_meta = craned_meta_map.at(craned_index).GetExclusivePtr(); // If any of the follow `if` is true, skip this node. @@ -2046,14 +1970,11 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( } } else { craned_indexes_.emplace_back(craned_index); - ++selected_node_cnt; + if (craned_indexes_.size() >= node_num_limit) break; } - ++task_num_node_id_it; } - if (selected_node_cnt < task->node_num) return false; - CRANE_ASSERT_MSG(selected_node_cnt == task->node_num, - "selected_node_cnt != task->node_num"); + if (craned_indexes_.size() < task->node_num) return false; ResourceV2 allocated_res; task->allocated_res_view.SetToZero(); @@ -2078,288 +1999,53 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( task->SetResources(std::move(allocated_res)); - for (CranedId craned_id : craned_indexes_) { - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE("Find valid time segments for task #{} on craned {}", - task->TaskId(), craned_id); - } - - auto& time_avail_res_map = - node_selection_info.node_time_avail_res_map.at(craned_id); - - // Find all valid time segments in this node for this task. - // The expected start time must exist because all tasks in - // pending_task_map can be run under the amount of all resources in this - // node. At some future time point, all tasks will end and this pending - // task can eventually be run because the total resource of all the nodes - // in `craned_indexes` >= the resource required by the task. - auto res_it = time_avail_res_map.begin(); + std::vector trackers; + std::priority_queue, + std::function> + pq([](const TimeDeltaResTracker* lhs, const TimeDeltaResTracker* rhs) { + return lhs->it->first > rhs->it->first; + }); - std::vector time_segments; - absl::Duration valid_duration; - absl::Time expected_start_time; + for (CranedId craned_id : craned_indexes_) { + auto& time_delta_res_map = + node_selection_info.node_time_delta_res_map.at(craned_id); + auto it = time_delta_res_map.begin(); + trackers.emplace_back(craned_id, it, time_delta_res_map.end(), + &task->Resources().at(craned_id)); + pq.emplace(&trackers.back()); + } - ResourceInNode const& task_node_res = - task->Resources().EachNodeResMap().at(craned_id); - - // Figure out in this craned node, which time segments have sufficient - // resource to run the task. - // For example, if task needs 3 cpu cores, time_avail_res_map is: - // [1, 3], [5, 6], [7, 2], [9, 6] | Format: [start_time, cores] - // Then the valid time segments are: - // [1,6], [9, inf] | Format: [start_time, duration] - bool trying = true; - while (true) { - CRANE_ASSERT(res_it != time_avail_res_map.end()); - if (trying) { - if (task_node_res <= res_it->second) { - trying = false; - expected_start_time = res_it->first; - - if (std::next(res_it) == time_avail_res_map.end()) { - valid_duration = absl::InfiniteDuration(); - time_segments.emplace_back(expected_start_time, valid_duration); - break; - } else { - valid_duration = std::next(res_it)->first - res_it->first; - res_it++; - continue; - } - } else { - if (++res_it == time_avail_res_map.end()) break; - continue; - } - } else { - if (task_node_res <= res_it->second) { - if (std::next(res_it) == time_avail_res_map.end()) { - valid_duration = absl::InfiniteDuration(); - time_segments.emplace_back(expected_start_time, valid_duration); - break; - } else { - valid_duration += std::next(res_it)->first - res_it->first; - res_it++; - continue; - } - } else { - trying = true; - time_segments.emplace_back(expected_start_time, valid_duration); - if (++res_it == time_avail_res_map.end()) - break; - else - continue; - } - } + int satisfied_count = 0; + absl::Time last_time = absl::InfinitePast(); + while (!pq.empty()) { + absl::Time time = pq.top()->it->first; + while (!pq.empty() && pq.top()->it->first == time) { + auto tmp = pq.top(); + pq.pop(); + satisfied_count += tmp->count; + if (tmp->genNext()) pq.emplace(tmp); } - - // Now we have the valid time segments for this node. Find the - // intersection with the set in the previous pass. - if (first_pass) { - intersected_time_segments = std::move(time_segments); - first_pass = false; - - if constexpr (kAlgoTraceOutput) { - std::vector valid_seg_str; - for (auto& seg : intersected_time_segments) - valid_seg_str.emplace_back(fmt::format( - "[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now), - absl::ToInt64Seconds(seg.start - now + seg.duration))); - for (auto& seg : intersected_time_segments) - valid_seg_str.emplace_back(fmt::format( - "[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now), - absl::ToInt64Seconds(seg.start - now + seg.duration))); - CRANE_TRACE("After looping craned {}, valid time segments: {}", - craned_id, absl::StrJoin(valid_seg_str, ", ")); - } + if (satisfied_count < task->node_num) { + last_time = absl::InfinitePast(); } else { - std::vector new_intersected_time_segments; - - for (auto&& seg : time_segments) { - absl::Time start = seg.start; - absl::Time end = seg.start + seg.duration; - - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE( - "Trying to intersect time segment: [start: {}, end: " - "{})", - absl::ToInt64Seconds(start - now), - absl::ToInt64Seconds(start - now + seg.duration)); - } - - // Segment: [start, end) - // e.g. segment.start=5, segment.duration=1s => [5,6) - - // Find the first time point that >= seg.start + seg.duration - auto it2 = std::lower_bound(intersected_time_segments.begin(), - intersected_time_segments.end(), end); - - if (it2 == intersected_time_segments.begin()) { - // If it2 == intersected_time_segments.begin(), - // this time segment has no overlap with any time segment - // in intersected_time_segments. - // Just skip it. - // it2 - // V - // seg *------* *----* .... intersected_time_segments - // *----------* - // ^ - // end - // - // Do nothing under such situation. - continue; - } else { - it2 = std::prev(it2); - // it2 now looks like - // *-----------* seg - // (.......) *--------* *---* intersected_time_segments - // ^ - // it2 (the last time segment to do intersection) - - // Find the first segment in `intersected_time_segments` - // whose end >= seg.start. - // Note: If end == seg.start, there's no intersection. - // - // We first find a segment (it1) in `intersected_time_segments` - // whose start < seg.start and is closet to seg.start... - // There may be 2 situation: - // A1. - // start - // V - // *-------* seg - // *--------* *----* - // ^ - // it1 ( != intersected_time_segment.end() ) - // - // A2. - // *-------* seg *-------* seg - // *--* or *---* - // ^ ^ - // it1 it1 ( == intersected_time_segment.begin()) - auto it1 = std::lower_bound(intersected_time_segments.begin(), - intersected_time_segments.end(), start); - if (it1 == intersected_time_segments.begin()) { - // Case A2: - // - // If it1 == intersected_time_segments.begin(), there is no time - // segment that starts previous to seg.start but there are some - // segments immediately after seg.start. The first one of them is - // the beginning of intersected_time_segments. - // it1 == begin() it2 - // V V - // *---* *----* - // *--------------------* seg - // ^ - // start - } else { - // Case A1: - // If there is an overlap between first segment and `seg`, take - // the intersection. - - // Case A1-1 (end >= it1->start): - // - // std::prev(it1) it1 - // V V - // *----------------------------* *--------* - // *-----------------------------* - // |<-intersected part->| ^ - // ^ | - // start end - // - // OR - // - // Case A1-2 (end < it1->start): - // it0 == std::prev(it1) it1 - // V V - // *--------------------------------* *--------* - // *--------------------* - // |<-intersected part->| - // ^ ^ - // start end - auto it0 = std::prev(it1); - if (it0->start + it0->duration > start) { - // Note: If it0->start + it0->duration == seg.start, - // there's no intersection. - - absl::Duration intersected_duration; - if (end < it0->start + it0->duration) - // Case A1-2 - intersected_duration = end - start; - else - // Case A1-1 - intersected_duration = it0->start + it0->duration - start; - - new_intersected_time_segments.emplace_back(start, - intersected_duration); - } - } - - // |<-- intersected range -->| - // it1 it2 - // v v - // *~~~~~~~* *----* *--------* *~~~~~~~~~~~~* - // *----------------------------------------* - // - // - // Case A1-3 (There's no half-intersected tail segment): - // it2 it1 - // v v - // *-------------* *--------------* - // *----------* - // - // Or - // - // it2 it1 == end() - // v v - // *-------------* - // *----------* - // - // Note: In case A1-3, it2 < it1. - // Thus, termination condition should be (it2 < it1). - for (auto it = it1; it < it2; ++it) - new_intersected_time_segments.emplace_back(it->start, it->duration); - - if (it2 < it1) { - // Case A1-3. - // No half-intersected tail segment should be handled. - } else { - // the last insertion handles the following 2 situations. - // it2 - // *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------------* - // *--------------------------------* - // OR - // it2 - // *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------* - // *-------------------------------------* - new_intersected_time_segments.emplace_back( - it2->start, std::min(it2->duration, end - it2->start)); - } - } + if (last_time == absl::InfinitePast()) { + last_time = time; } - - intersected_time_segments = std::move(new_intersected_time_segments); - - if constexpr (kAlgoTraceOutput) { - std::vector valid_seg_str; - for (auto& seg : intersected_time_segments) { - valid_seg_str.emplace_back(fmt::format( - "[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now), - absl::ToInt64Seconds(seg.start - now + seg.duration))); + if (time - last_time >= task->time_limit || pq.empty()) { + *start_time = last_time; + craned_ids->clear(); + for (auto& tracker : trackers) { + if (tracker.satisfied) { + craned_ids->emplace_back(tracker.craned_id); + if (craned_ids->size() >= task->node_num) break; + } } - CRANE_TRACE("After looping craned {}, valid time segments: {}", - craned_id, absl::StrJoin(valid_seg_str, ", ")); + return true; } } } - *craned_ids = std::move(craned_indexes_); - - // Calculate the earliest start time - for (auto&& seg : intersected_time_segments) { - if (task->time_limit <= seg.duration) { - *start_time = seg.start; - return true; - } - } - return false; } @@ -2502,137 +2188,42 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_( const ResourceV2& resources, std::list const& craned_ids, MinLoadFirst::NodeSelectionInfo* node_selection_info) { NodeSelectionInfo& node_info = *node_selection_info; - bool ok; - for (CranedId const& craned_id : craned_ids) { - // Increase the running task num in Craned `crane_id`. - for (auto it = node_info.task_num_node_id_map.begin(); - it != node_info.task_num_node_id_map.end(); ++it) { - if (it->second == craned_id) { - uint32_t num_task = it->first + 1; - node_info.task_num_node_id_map.erase(it); - node_info.task_num_node_id_map.emplace(num_task, craned_id); - break; - } - } + absl::Time task_end_time = expected_start_time + duration; + // Increase the running task num in Craned `crane_id`. + for (CranedId craned_id : craned_ids) { ResourceInNode const& task_res_in_node = resources.at(craned_id); - TimeAvailResMap& time_avail_res_map = - node_info.node_time_avail_res_map[craned_id]; - - absl::Time task_end_time = expected_start_time + duration; - - auto task_duration_begin_it = - time_avail_res_map.upper_bound(expected_start_time); - if (task_duration_begin_it == time_avail_res_map.end()) { - --task_duration_begin_it; - // Situation #1 - // task duration - // |<-------------->| - // *-----------------*----------------------> inf - // ^ - // task_duration_begin_it - // - // *-----------------*----------------|-----> inf - // ^ ^ - // | insert here - // subtract resource here - // - // OR Situation #2 - // task duration - // |<-------------->| - // *-----------------*----------------------> inf - // ^ - // task_duration_begin_it - // - // *-----------------*--|----------------|--> inf - // ^ ^ ^ - // insert here | insert here - // subtract resource here - - TimeAvailResMap::iterator inserted_it; - std::tie(inserted_it, ok) = time_avail_res_map.emplace( - task_end_time, task_duration_begin_it->second); - CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); - - if (task_duration_begin_it->first == expected_start_time) { - // Situation #1 - CRANE_ASSERT(task_res_in_node <= task_duration_begin_it->second); - task_duration_begin_it->second -= task_res_in_node; - } else { - // Situation #2 - std::tie(inserted_it, ok) = time_avail_res_map.emplace( - expected_start_time, task_duration_begin_it->second); - CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); - - CRANE_ASSERT(task_res_in_node <= inserted_it->second); - inserted_it->second -= task_res_in_node; - } - } else { - --task_duration_begin_it; - // Situation #3 - // task duration - // |<-------------->| - // *-------*----------*---------*------------ - // ^ ^ - // task_duration_begin_it task_duration_end_it - // *-------*------|---*---------*--|--------- - // ^ ^ ^ ^ ^ - // insert here | | | insert here - // subtract at these points - // - // Or Situation #4 - // task duration - // |<----------------->| - // *-------*----------*--------*------------ - // ^ ^ - // task_duration_begin_it task_duration_end_it - - // std::prev can be used without any check here. - // There will always be one time point (now) before task_end_time. - - if (task_duration_begin_it->first != expected_start_time) { - // Situation #3 (begin) - TimeAvailResMap::iterator inserted_it; - std::tie(inserted_it, ok) = time_avail_res_map.emplace( - expected_start_time, task_duration_begin_it->second); - CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); - - task_duration_begin_it = inserted_it; - } + node_info.updateCost(craned_id, expected_start_time, task_end_time, + task_res_in_node); + TimeDeltaResMap& time_delta_res_map = + node_info.node_time_delta_res_map[craned_id]; - auto task_duration_end_it = - std::prev(time_avail_res_map.upper_bound(task_end_time)); + time_delta_res_map[expected_start_time] -= task_res_in_node; + time_delta_res_map[task_end_time] += task_res_in_node; - // Subtract the required resources within the interval. - for (auto in_duration_it = task_duration_begin_it; - in_duration_it != task_duration_end_it; in_duration_it++) { - CRANE_ASSERT(task_res_in_node <= in_duration_it->second); - in_duration_it->second -= task_res_in_node; - } - - // Check if we need to insert a time point at - // `task_end_time_plus_1s` Detailed version of why: Assume one task - // end at time x-2, If "x+2" lies in the interval [x, y-1) in - // time__avail_res__map, - // for example, x+2 in [x, y-1) with the available resources amount - // `a`, we need to divide this interval into to two intervals: [x, - // x+2]: a-k, where k is the resource amount that task requires, - // [x+3, y-1]: a - // Therefore, we need to insert a key-value at x+3 to preserve this. - // However, if the length of [x+3, y-1] is 0, or more simply, the - // point x+3 exists, there's no need to save the interval [x+3, - // y-1]. - if (task_duration_end_it->first != task_end_time) { - // Situation #3 (end) - TimeAvailResMap::iterator inserted_it; - std::tie(inserted_it, ok) = time_avail_res_map.emplace( - task_end_time, task_duration_end_it->second); - CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); - - CRANE_ASSERT(task_res_in_node <= task_duration_end_it->second); - task_duration_end_it->second -= task_res_in_node; + if constexpr (kAlgoTraceOutput) { + std::string str; + str.append( + fmt::format("Subtracted resource from Craned {} at now+{}s to " + "now+{}s: cpu: {}, mem: {}\n", + craned_id, + absl::ToInt64Seconds(expected_start_time - + time_delta_res_map.begin()->first), + absl::ToInt64Seconds(task_end_time - + time_delta_res_map.begin()->first), + task_res_in_node.allocatable_res.cpu_count, + task_res_in_node.allocatable_res.memory_bytes)); + ResourceInNode cur_res; + for (auto& [time, res] : time_delta_res_map) { + cur_res += res; + str.append(fmt::format( + "Craned {} res_avail at now+{}s: cpu: {}, mem: {}\n", craned_id, + absl::ToInt64Seconds(time - time_delta_res_map.begin()->first), + cur_res.allocatable_res.cpu_count, + cur_res.allocatable_res.memory_bytes)); } + CRANE_TRACE("{}", str); } } } @@ -2706,7 +2297,8 @@ CraneErr TaskScheduler::AcquireTaskAttributes(TaskInCtld* task) { task_mem_per_cpu = part_meta.default_mem_per_cpu; } else if (part_meta.max_mem_per_cpu != 0) { // If a task sets its memory bytes, - // check if memory/core ratio is greater than the partition's maximum value. + // check if memory/core ratio is greater than the partition's maximum + // value. task_mem_per_cpu = std::min(task_mem_per_cpu, (double)part_meta.max_mem_per_cpu); } diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 473580b2..dacaa9ea 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -138,36 +138,69 @@ class MinLoadFirst : public INodeSelectionAlgo { private: static constexpr bool kAlgoTraceOutput = false; + static constexpr bool kAlgoRedundantNode = true; /** - * This map stores how much resource is available - * over time on each Craned node. + * This map stores how much available resource changes over time on each + * Craned node. * * In this map, the time is discretized by 1s and starts from absl::Now(). * {x: a, y: b, z: c, ...} means that - * In time interval [x, y-1], the amount of available resources is a. - * In time interval [y, z-1], the amount of available resources is b. - * In time interval [z, ...], the amount of available resources is c. + * At time x, the amount of available resources is a. + * At time y, the amount of available resources is a + b. + * At time z, the amount of available resources is a + b + c. */ - using TimeAvailResMap = std::map; - struct TimeSegment { - TimeSegment(absl::Time start, absl::Duration duration) - : start(start), duration(duration) {} - absl::Time start; - absl::Duration duration; - - bool operator<(const absl::Time& rhs) const { return this->start < rhs; } + using TimeDeltaResMap = std::map; + struct TimeDeltaResTracker { + const CranedId craned_id; + TimeDeltaResMap::const_iterator it; + const TimeDeltaResMap::const_iterator end; + const ResourceInNode* task_res; + ResourceInNode avail_res; + int count; + bool satisfied{false}; + + TimeDeltaResTracker(const CranedId& craned_id, + const TimeDeltaResMap::const_iterator& it, + const TimeDeltaResMap::const_iterator& end, + const ResourceInNode* task_res) + : craned_id(craned_id), it(it), end(end), task_res(task_res) { + avail_res = it->second; + count = (it != end && *task_res <= avail_res) ? 1 : 0; + } - friend bool operator<(const absl::Time& lhs, const TimeSegment& rhs) { - return lhs < rhs.start; + bool genNext() { + if (count != 0) satisfied = !satisfied; + count = 0; + if (++it == end) return false; + if (*task_res <= avail_res) count -= 1; + avail_res += it->second; + if (*task_res <= avail_res) count += 1; + return true; } }; struct NodeSelectionInfo { - std::multimap - task_num_node_id_map; - std::unordered_map node_time_avail_res_map; + // Craned_ids are sorted by cost. + std::set> cost_node_id_set; + std::unordered_map node_cost_map; + std::unordered_map node_time_delta_res_map; + + // Cost is now the number of tasks running or pending on the node. + // TODO: Better the cost function base on the time-resource map. + void setCost(const CranedId& craned_id, uint32_t cost) { + cost_node_id_set.erase({node_cost_map[craned_id], craned_id}); + node_cost_map[craned_id] = cost; + cost_node_id_set.emplace(cost, craned_id); + } + void updateCost(const CranedId& craned_id, const absl::Time& start_time, + const absl::Time& end_time, const ResourceInNode& resources) { + auto& cost = node_cost_map[craned_id]; + cost_node_id_set.erase({cost, craned_id}); + cost += 1; + cost_node_id_set.emplace(cost, craned_id); + } }; static void CalculateNodeSelectionInfoOfPartition_( @@ -178,8 +211,6 @@ class MinLoadFirst : public INodeSelectionAlgo { const CranedMetaContainer::CranedMetaRawMap& craned_meta_map, NodeSelectionInfo* node_selection_info); - // Input should guarantee that provided nodes in `node_selection_info` has - // enough nodes whose resource is >= task->resource. static bool CalculateRunningNodesAndStartTime_( const NodeSelectionInfo& node_selection_info, const util::Synchronized& partition_meta_ptr, @@ -245,7 +276,8 @@ class TaskScheduler { void TerminateTasksOnCraned(const CranedId& craned_id, uint32_t exit_code); - // Temporary inconsistency may happen. If 'false' is returned, just ignore it. + // Temporary inconsistency may happen. If 'false' is returned, just ignore + // it. void QueryTasksInRam(const crane::grpc::QueryTasksInfoRequest* request, crane::grpc::QueryTasksInfoReply* response); diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index 7caa53ac..b5f7e934 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -158,6 +158,8 @@ using SlotId = std::string; // Model the allocatable resources on a craned node. // It contains CPU and memory by now. +// Delta of resources is used, so it can be negative. +// Using unsigned type do not affect the correctness. struct AllocatableResource { cpu_t cpu_count{0}; From 309be0fb5401b93d5567b2ff6f0262d77dc8264b Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Sun, 1 Sep 2024 20:07:35 +0800 Subject: [PATCH 03/10] update cost function. --- src/CraneCtld/TaskScheduler.cpp | 19 ++++++++++++++++--- src/CraneCtld/TaskScheduler.h | 18 +++++++++++------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 8021bc9d..5f160dde 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -1895,17 +1895,20 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( // second task end, ...] in this node. auto& time_delta_res_map = node_selection_info_ref.node_time_delta_res_map[craned_id]; + node_selection_info_ref.node_res_total_map[craned_id] = + craned_meta->res_total; + node_selection_info_ref.setCost(craned_id, 0); time_delta_res_map[now] = craned_meta->res_avail; for (auto& [end_time, task_id] : end_time_task_id_vec) { time_delta_res_map[end_time] += running_tasks.at(task_id)->Resources().at(craned_id); + node_selection_info_ref.updateCost( + craned_id, now, end_time, + running_tasks.at(task_id)->Resources().at(craned_id)); } - node_selection_info_ref.setCost( - craned_id, craned_meta->running_task_resource_map.size()); - if constexpr (kAlgoTraceOutput) { std::string str; ResourceInNode cur_res; @@ -2026,6 +2029,15 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( satisfied_count += tmp->count; if (tmp->genNext()) pq.emplace(tmp); } + if constexpr (kAlgoTraceOutput) { + CRANE_TRACE("At time now+{}s, {} nodes are satisfied.", + (time - now) / absl::Seconds(1), satisfied_count); + for (auto& tracker : trackers) { + if (tracker.satisfied) { + CRANE_TRACE("Craned {} is satisfied.", tracker.craned_id); + } + } + } if (satisfied_count < task->node_num) { last_time = absl::InfinitePast(); } else { @@ -2041,6 +2053,7 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( if (craned_ids->size() >= task->node_num) break; } } + CRANE_ASSERT(craned_ids->size() == task->node_num); return true; } } diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index dacaa9ea..8a4c6f42 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -183,22 +183,26 @@ class MinLoadFirst : public INodeSelectionAlgo { struct NodeSelectionInfo { // Craned_ids are sorted by cost. - std::set> cost_node_id_set; - std::unordered_map node_cost_map; + std::set> cost_node_id_set; + std::unordered_map node_cost_map; std::unordered_map node_time_delta_res_map; + std::unordered_map node_res_total_map; - // Cost is now the number of tasks running or pending on the node. - // TODO: Better the cost function base on the time-resource map. - void setCost(const CranedId& craned_id, uint32_t cost) { + void setCost(const CranedId& craned_id, uint64_t cost) { cost_node_id_set.erase({node_cost_map[craned_id], craned_id}); node_cost_map[craned_id] = cost; cost_node_id_set.emplace(cost, craned_id); } void updateCost(const CranedId& craned_id, const absl::Time& start_time, - const absl::Time& end_time, const ResourceInNode& resources) { + const absl::Time& end_time, + const ResourceInNode& resources) { auto& cost = node_cost_map[craned_id]; cost_node_id_set.erase({cost, craned_id}); - cost += 1; + auto& total_res = node_res_total_map[craned_id]; + double cpu_rate = + static_cast(resources.allocatable_res.cpu_count) / + static_cast(total_res.allocatable_res.cpu_count); + cost += round((end_time - start_time) / absl::Seconds(1) * cpu_rate); cost_node_id_set.emplace(cost, craned_id); } }; From 3330f6d53c59b278348b3770531401cfd3892f79 Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Mon, 16 Sep 2024 16:44:59 +0800 Subject: [PATCH 04/10] fix wild pointers caused by vector expansion --- src/CraneCtld/TaskScheduler.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 5f160dde..ab32de3b 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2010,6 +2010,7 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( return lhs->it->first > rhs->it->first; }); + trackers.reserve(craned_indexes_.size()); for (CranedId craned_id : craned_indexes_) { auto& time_delta_res_map = node_selection_info.node_time_delta_res_map.at(craned_id); From 85c7449b702faf875d8d3ce91daaaa24223ad9e1 Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Wed, 25 Sep 2024 21:49:42 +0800 Subject: [PATCH 05/10] remove redundant absl::now() --- src/CraneCtld/TaskScheduler.cpp | 6 +++--- src/CraneCtld/TaskScheduler.h | 9 ++++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index ab32de3b..3a0074c1 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2095,7 +2095,7 @@ void MinLoadFirst::NodeSelect( std::vector task_id_vec; task_id_vec = m_priority_sorter_->GetOrderedTaskIdList( - *pending_task_map, running_tasks, g_config.ScheduledBatchSize); + *pending_task_map, running_tasks, g_config.ScheduledBatchSize, now); // Now we know, on every node, the # of running tasks (which // doesn't include those we select as the incoming running tasks in the // following code) and how many resources are available at the end of each @@ -2441,8 +2441,8 @@ void TaskScheduler::TerminateTasksOnCraned(const CranedId& craned_id, std::vector MultiFactorPriority::GetOrderedTaskIdList( const OrderedTaskMap& pending_task_map, - const UnorderedTaskMap& running_task_map, size_t limit_num) { - absl::Time now = absl::Now(); + const UnorderedTaskMap& running_task_map, size_t limit_num, + absl::Time now) { CalculateFactorBound_(pending_task_map, running_task_map, now); std::vector> task_priority_vec; diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 8a4c6f42..396e3412 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -35,7 +35,8 @@ class IPrioritySorter { public: virtual std::vector GetOrderedTaskIdList( const OrderedTaskMap& pending_task_map, - const UnorderedTaskMap& running_task_map, size_t limit) = 0; + const UnorderedTaskMap& running_task_map, size_t limit, + absl::Time now) = 0; virtual ~IPrioritySorter() = default; }; @@ -44,7 +45,8 @@ class BasicPriority : public IPrioritySorter { public: std::vector GetOrderedTaskIdList( const OrderedTaskMap& pending_task_map, - const UnorderedTaskMap& running_task_map, size_t limit) override { + const UnorderedTaskMap& running_task_map, size_t limit, + absl::Time now) override { size_t len = std::min(pending_task_map.size(), limit); std::vector task_id_vec; @@ -69,7 +71,8 @@ class MultiFactorPriority : public IPrioritySorter { public: std::vector GetOrderedTaskIdList( const OrderedTaskMap& pending_task_map, - const UnorderedTaskMap& running_task_map, size_t limit_num) override; + const UnorderedTaskMap& running_task_map, size_t limit_num, + absl::Time now) override; private: struct FactorBound { From 110dee35adb71b52b7dbc38071daaadad4ebd52f Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Wed, 25 Sep 2024 23:03:55 +0800 Subject: [PATCH 06/10] recover TimeAvailResMap to avoid resource addition --- src/CraneCtld/TaskScheduler.cpp | 262 +++++++++++++++++++++++++------- src/CraneCtld/TaskScheduler.h | 48 +++--- 2 files changed, 225 insertions(+), 85 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 3a0074c1..b26be7a5 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -1876,9 +1876,11 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( running_task_ids_str.emplace_back(std::to_string(task_id)); } + std::sort( + end_time_task_id_vec.begin(), end_time_task_id_vec.end(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + if constexpr (kAlgoTraceOutput) { - CRANE_TRACE("Craned node {} has running tasks: {}", craned_id, - absl::StrJoin(running_task_ids_str, ", ")); if (!end_time_task_id_vec.empty()) { std::string str; str.append( @@ -1893,33 +1895,89 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( // Calculate delta resources at [now, first task end, // second task end, ...] in this node. - auto& time_delta_res_map = - node_selection_info_ref.node_time_delta_res_map[craned_id]; + auto& time_avail_res_map = + node_selection_info_ref.node_time_avail_res_map[craned_id]; node_selection_info_ref.node_res_total_map[craned_id] = craned_meta->res_total; node_selection_info_ref.setCost(craned_id, 0); - time_delta_res_map[now] = craned_meta->res_avail; + // Insert [now, inf) interval and thus guarantee time_avail_res_map is not + // null. + time_avail_res_map[now] = craned_meta->res_avail; - for (auto& [end_time, task_id] : end_time_task_id_vec) { - time_delta_res_map[end_time] += - running_tasks.at(task_id)->Resources().at(craned_id); - node_selection_info_ref.updateCost( - craned_id, now, end_time, - running_tasks.at(task_id)->Resources().at(craned_id)); + if constexpr (kAlgoTraceOutput) { + CRANE_TRACE("Craned {} initial res_avail now: cpu: {}, mem: {}, gres: {}", + craned_id, craned_meta->res_avail.allocatable_res.cpu_count, + craned_meta->res_avail.allocatable_res.memory_bytes, + util::ReadableDresInNode(craned_meta->res_avail)); } - if constexpr (kAlgoTraceOutput) { - std::string str; - ResourceInNode cur_res; - for (auto& [time, res] : time_delta_res_map) { - cur_res += res; - str.append(fmt::format( - "Craned {} res_avail at now+{}s: cpu: {}, mem: {}\n", craned_id, - absl::ToInt64Seconds(time - now), cur_res.allocatable_res.cpu_count, - cur_res.allocatable_res.memory_bytes)); + { // Limit the scope of `iter` + auto cur_time_iter = time_avail_res_map.begin(); + bool ok; + for (auto& [end_time, task_id] : end_time_task_id_vec) { + const auto& running_task = running_tasks.at(task_id); + ResourceInNode const& running_task_res = + running_task->Resources().at(craned_id); + node_selection_info_ref.updateCost(craned_id, now, end_time, + running_task_res); + if (cur_time_iter->first != end_time) { + /** + * If there isn't any task that ends at the `end_time`, + * insert an interval [end_time, inf) with the resource of + * the previous interval for the following addition of + * freed resources. + * Note: Such two intervals [5,6), [6,inf) do not overlap with + * each other. + */ + std::tie(cur_time_iter, ok) = + time_avail_res_map.emplace(end_time, cur_time_iter->second); + } + + /** + * For the situation in which multiple tasks may end at the same + * time: + * end_time__task_id_vec: [{now+1, 1}, {now+1, 2}, ...] + * But we want only 1 time point in time__avail_res__map: + * {{now+1+1: available_res(now) + available_res(1) + + * available_res(2)}, ...} + */ + cur_time_iter->second += running_task_res; + + if constexpr (kAlgoTraceOutput) { + CRANE_TRACE( + "Craned {} res_avail at now + {}s: cpu: {}, mem: {}, gres: {}; ", + craned_id, absl::ToInt64Seconds(cur_time_iter->first - now), + cur_time_iter->second.allocatable_res.cpu_count, + cur_time_iter->second.allocatable_res.memory_bytes, + util::ReadableDresInNode(cur_time_iter->second)); + } + } + + if constexpr (kAlgoTraceOutput) { + std::string str; + str.append(fmt::format("Node ({}, {}): ", partition_id, craned_id)); + auto prev_iter = time_avail_res_map.begin(); + auto iter = std::next(prev_iter); + for (; iter != time_avail_res_map.end(); prev_iter++, iter++) { + str.append( + fmt::format("[ now+{}s , now+{}s ) Available allocatable " + "res: cpu core {}, mem {}, gres {}", + absl::ToInt64Seconds(prev_iter->first - now), + absl::ToInt64Seconds(iter->first - now), + prev_iter->second.allocatable_res.cpu_count, + prev_iter->second.allocatable_res.memory_bytes, + util::ReadableDresInNode(prev_iter->second))); + } + str.append( + fmt::format("[ now+{}s , inf ) Available allocatable " + "res: cpu core {}, mem {}, gres {}", + absl::ToInt64Seconds(prev_iter->first - now), + prev_iter->second.allocatable_res.cpu_count, + prev_iter->second.allocatable_res.memory_bytes, + util::ReadableDresInNode(prev_iter->second))); + CRANE_TRACE("{}", str); } - CRANE_TRACE("{}", str); } } } @@ -1945,8 +2003,8 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( // for the task when checking task validity in TaskScheduler. continue; } - auto& time_delta_res_map = - node_selection_info.node_time_delta_res_map.at(craned_index); + auto& time_avail_res_map = + node_selection_info.node_time_avail_res_map.at(craned_index); auto craned_meta = craned_meta_map.at(craned_index).GetExclusivePtr(); // If any of the follow `if` is true, skip this node. @@ -2002,20 +2060,20 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( task->SetResources(std::move(allocated_res)); - std::vector trackers; - std::priority_queue, - std::function> - pq([](const TimeDeltaResTracker* lhs, const TimeDeltaResTracker* rhs) { + std::vector trackers; + std::priority_queue, + std::function> + pq([](const TimeAvailResTracker* lhs, const TimeAvailResTracker* rhs) { return lhs->it->first > rhs->it->first; }); trackers.reserve(craned_indexes_.size()); for (CranedId craned_id : craned_indexes_) { - auto& time_delta_res_map = - node_selection_info.node_time_delta_res_map.at(craned_id); - auto it = time_delta_res_map.begin(); - trackers.emplace_back(craned_id, it, time_delta_res_map.end(), + auto& time_avail_res_map = + node_selection_info.node_time_avail_res_map.at(craned_id); + auto it = time_avail_res_map.begin(); + trackers.emplace_back(craned_id, it, time_avail_res_map.end(), &task->Resources().at(craned_id)); pq.emplace(&trackers.back()); } @@ -2027,8 +2085,9 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( while (!pq.empty() && pq.top()->it->first == time) { auto tmp = pq.top(); pq.pop(); - satisfied_count += tmp->count; + satisfied_count -= tmp->satisfied; if (tmp->genNext()) pq.emplace(tmp); + satisfied_count += tmp->satisfied; } if constexpr (kAlgoTraceOutput) { CRANE_TRACE("At time now+{}s, {} nodes are satisfied.", @@ -2202,6 +2261,7 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_( const ResourceV2& resources, std::list const& craned_ids, MinLoadFirst::NodeSelectionInfo* node_selection_info) { NodeSelectionInfo& node_info = *node_selection_info; + bool ok; absl::Time task_end_time = expected_start_time + duration; @@ -2210,34 +2270,120 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_( ResourceInNode const& task_res_in_node = resources.at(craned_id); node_info.updateCost(craned_id, expected_start_time, task_end_time, task_res_in_node); - TimeDeltaResMap& time_delta_res_map = - node_info.node_time_delta_res_map[craned_id]; + TimeAvailResMap& time_avail_res_map = + node_info.node_time_avail_res_map[craned_id]; + + auto task_duration_begin_it = + time_avail_res_map.upper_bound(expected_start_time); + if (task_duration_begin_it == time_avail_res_map.end()) { + --task_duration_begin_it; + // Situation #1 + // task duration + // |<-------------->| + // *-----------------*----------------------> inf + // ^ + // task_duration_begin_it + // + // *-----------------*----------------|-----> inf + // ^ ^ + // | insert here + // subtract resource here + // + // OR Situation #2 + // task duration + // |<-------------->| + // *-----------------*----------------------> inf + // ^ + // task_duration_begin_it + // + // *-----------------*--|----------------|--> inf + // ^ ^ ^ + // insert here | insert here + // subtract resource here + + TimeAvailResMap::iterator inserted_it; + std::tie(inserted_it, ok) = time_avail_res_map.emplace( + task_end_time, task_duration_begin_it->second); + CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); + + if (task_duration_begin_it->first == expected_start_time) { + // Situation #1 + CRANE_ASSERT(task_res_in_node <= task_duration_begin_it->second); + task_duration_begin_it->second -= task_res_in_node; + } else { + // Situation #2 + std::tie(inserted_it, ok) = time_avail_res_map.emplace( + expected_start_time, task_duration_begin_it->second); + CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); - time_delta_res_map[expected_start_time] -= task_res_in_node; - time_delta_res_map[task_end_time] += task_res_in_node; + CRANE_ASSERT(task_res_in_node <= inserted_it->second); + inserted_it->second -= task_res_in_node; + } + } else { + --task_duration_begin_it; + // Situation #3 + // task duration + // |<-------------->| + // *-------*----------*---------*------------ + // ^ ^ + // task_duration_begin_it task_duration_end_it + // *-------*------|---*---------*--|--------- + // ^ ^ ^ ^ ^ + // insert here | | | insert here + // subtract at these points + // + // Or Situation #4 + // task duration + // |<----------------->| + // *-------*----------*--------*------------ + // ^ ^ + // task_duration_begin_it task_duration_end_it + + // std::prev can be used without any check here. + // There will always be one time point (now) before task_end_time. + + if (task_duration_begin_it->first != expected_start_time) { + // Situation #3 (begin) + TimeAvailResMap::iterator inserted_it; + std::tie(inserted_it, ok) = time_avail_res_map.emplace( + expected_start_time, task_duration_begin_it->second); + CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); + + task_duration_begin_it = inserted_it; + } - if constexpr (kAlgoTraceOutput) { - std::string str; - str.append( - fmt::format("Subtracted resource from Craned {} at now+{}s to " - "now+{}s: cpu: {}, mem: {}\n", - craned_id, - absl::ToInt64Seconds(expected_start_time - - time_delta_res_map.begin()->first), - absl::ToInt64Seconds(task_end_time - - time_delta_res_map.begin()->first), - task_res_in_node.allocatable_res.cpu_count, - task_res_in_node.allocatable_res.memory_bytes)); - ResourceInNode cur_res; - for (auto& [time, res] : time_delta_res_map) { - cur_res += res; - str.append(fmt::format( - "Craned {} res_avail at now+{}s: cpu: {}, mem: {}\n", craned_id, - absl::ToInt64Seconds(time - time_delta_res_map.begin()->first), - cur_res.allocatable_res.cpu_count, - cur_res.allocatable_res.memory_bytes)); + auto task_duration_end_it = + std::prev(time_avail_res_map.upper_bound(task_end_time)); + + // Subtract the required resources within the interval. + for (auto in_duration_it = task_duration_begin_it; + in_duration_it != task_duration_end_it; in_duration_it++) { + CRANE_ASSERT(task_res_in_node <= in_duration_it->second); + in_duration_it->second -= task_res_in_node; + } + + // Check if we need to insert a time point at + // `task_end_time_plus_1s` Detailed version of why: Assume one task + // end at time x-2, If "x+2" lies in the interval [x, y-1) in + // time__avail_res__map, + // for example, x+2 in [x, y-1) with the available resources amount + // `a`, we need to divide this interval into to two intervals: [x, + // x+2]: a-k, where k is the resource amount that task requires, + // [x+3, y-1]: a + // Therefore, we need to insert a key-value at x+3 to preserve this. + // However, if the length of [x+3, y-1] is 0, or more simply, the + // point x+3 exists, there's no need to save the interval [x+3, + // y-1]. + if (task_duration_end_it->first != task_end_time) { + // Situation #3 (end) + TimeAvailResMap::iterator inserted_it; + std::tie(inserted_it, ok) = time_avail_res_map.emplace( + task_end_time, task_duration_end_it->second); + CRANE_ASSERT_MSG(ok == true, "Insertion must be successful."); + + CRANE_ASSERT(task_res_in_node <= task_duration_end_it->second); + task_duration_end_it->second -= task_res_in_node; } - CRANE_TRACE("{}", str); } } } diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 396e3412..9b98a94a 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -144,43 +144,37 @@ class MinLoadFirst : public INodeSelectionAlgo { static constexpr bool kAlgoRedundantNode = true; /** - * This map stores how much available resource changes over time on each - * Craned node. + * This map stores how much resource is available + * over time on each Craned node. * * In this map, the time is discretized by 1s and starts from absl::Now(). * {x: a, y: b, z: c, ...} means that - * At time x, the amount of available resources is a. - * At time y, the amount of available resources is a + b. - * At time z, the amount of available resources is a + b + c. + * In time interval [x, y-1], the amount of available resources is a. + * In time interval [y, z-1], the amount of available resources is b. + * In time interval [z, ...], the amount of available resources is c. */ - using TimeDeltaResMap = std::map; - struct TimeDeltaResTracker { + using TimeAvailResMap = std::map; + struct TimeAvailResTracker { const CranedId craned_id; - TimeDeltaResMap::const_iterator it; - const TimeDeltaResMap::const_iterator end; + TimeAvailResMap::const_iterator it; + const TimeAvailResMap::const_iterator end; const ResourceInNode* task_res; - ResourceInNode avail_res; - int count; - bool satisfied{false}; + bool satisfied; - TimeDeltaResTracker(const CranedId& craned_id, - const TimeDeltaResMap::const_iterator& it, - const TimeDeltaResMap::const_iterator& end, + TimeAvailResTracker(const CranedId& craned_id, + const TimeAvailResMap::const_iterator& begin, + const TimeAvailResMap::const_iterator& end, const ResourceInNode* task_res) - : craned_id(craned_id), it(it), end(end), task_res(task_res) { - avail_res = it->second; - count = (it != end && *task_res <= avail_res) ? 1 : 0; - } + : craned_id(craned_id), + it(begin), + end(end), + task_res(task_res), + satisfied(false) {} bool genNext() { - if (count != 0) satisfied = !satisfied; - count = 0; - if (++it == end) return false; - if (*task_res <= avail_res) count -= 1; - avail_res += it->second; - if (*task_res <= avail_res) count += 1; - return true; + satisfied = *task_res <= it->second; + return ++it != end; } }; @@ -188,7 +182,7 @@ class MinLoadFirst : public INodeSelectionAlgo { // Craned_ids are sorted by cost. std::set> cost_node_id_set; std::unordered_map node_cost_map; - std::unordered_map node_time_delta_res_map; + std::unordered_map node_time_avail_res_map; std::unordered_map node_res_total_map; void setCost(const CranedId& craned_id, uint64_t cost) { From b51fb7d32e19e7dff9d09d90d1cc594aad24565c Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Wed, 25 Sep 2024 23:14:40 +0800 Subject: [PATCH 07/10] add time window and task limit for craned when scheduling. --- src/CraneCtld/TaskScheduler.cpp | 12 ++++++++++++ src/CraneCtld/TaskScheduler.h | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index b26be7a5..355627d4 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2005,6 +2005,15 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( } auto& time_avail_res_map = node_selection_info.node_time_avail_res_map.at(craned_index); + // Number of tasks is not less than map size. + // When condition is true, the craned has too many tasks. + if (time_avail_res_map.size() >= kAlgoMaxTaskNumPerNode) { + if constexpr (kAlgoTraceOutput) { + CRANE_TRACE("Craned {} has too many tasks. Skipping this craned.", + craned_index); + } + continue; + } auto craned_meta = craned_meta_map.at(craned_index).GetExclusivePtr(); // If any of the follow `if` is true, skip this node. @@ -2082,6 +2091,9 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( absl::Time last_time = absl::InfinitePast(); while (!pq.empty()) { absl::Time time = pq.top()->it->first; + if (time - now > kAlgoMaxTimeWindow) { + return false; + } while (!pq.empty() && pq.top()->it->first == time) { auto tmp = pq.top(); pq.pop(); diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 9b98a94a..1a19a08b 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -16,6 +16,8 @@ #pragma once +#include + #include "CtldPublicDefs.h" // Precompiled header comes first! @@ -142,6 +144,8 @@ class MinLoadFirst : public INodeSelectionAlgo { private: static constexpr bool kAlgoTraceOutput = false; static constexpr bool kAlgoRedundantNode = true; + static constexpr uint32_t kAlgoMaxTaskNumPerNode = 1000; + static constexpr absl::Duration kAlgoMaxTimeWindow = absl::Hours(24 * 7); /** * This map stores how much resource is available From 75ae91baa718a9a968e8bd4b3773556f5ddbd9b9 Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:39:17 +0800 Subject: [PATCH 08/10] fix node inconsistency --- src/CraneCtld/TaskScheduler.cpp | 56 ++++++--------- src/CraneCtld/TaskScheduler.h | 70 +++++++++++++++++-- .../PublicHeader/include/crane/PublicHeader.h | 2 - 3 files changed, 88 insertions(+), 40 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 355627d4..e57a8ea2 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2081,53 +2081,43 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_( for (CranedId craned_id : craned_indexes_) { auto& time_avail_res_map = node_selection_info.node_time_avail_res_map.at(craned_id); - auto it = time_avail_res_map.begin(); - trackers.emplace_back(craned_id, it, time_avail_res_map.end(), + trackers.emplace_back(craned_id, time_avail_res_map.begin(), + time_avail_res_map.end(), &task->Resources().at(craned_id)); pq.emplace(&trackers.back()); } - int satisfied_count = 0; - absl::Time last_time = absl::InfinitePast(); + TrackerList satisfied_trackers(task->node_num); + while (!pq.empty()) { absl::Time time = pq.top()->it->first; if (time - now > kAlgoMaxTimeWindow) { return false; } while (!pq.empty() && pq.top()->it->first == time) { - auto tmp = pq.top(); + auto tracker = pq.top(); pq.pop(); - satisfied_count -= tmp->satisfied; - if (tmp->genNext()) pq.emplace(tmp); - satisfied_count += tmp->satisfied; - } - if constexpr (kAlgoTraceOutput) { - CRANE_TRACE("At time now+{}s, {} nodes are satisfied.", - (time - now) / absl::Seconds(1), satisfied_count); - for (auto& tracker : trackers) { - if (tracker.satisfied) { - CRANE_TRACE("Craned {} is satisfied.", tracker.craned_id); - } + if (tracker->satisfied()) { + satisfied_trackers.try_push_back(tracker, time); + } else { + satisfied_trackers.try_erase(tracker); } - } - if (satisfied_count < task->node_num) { - last_time = absl::InfinitePast(); - } else { - if (last_time == absl::InfinitePast()) { - last_time = time; + if (tracker->genNext()) { + pq.emplace(tracker); } - if (time - last_time >= task->time_limit || pq.empty()) { - *start_time = last_time; - craned_ids->clear(); - for (auto& tracker : trackers) { - if (tracker.satisfied) { - craned_ids->emplace_back(tracker.craned_id); - if (craned_ids->size() >= task->node_num) break; - } - } - CRANE_ASSERT(craned_ids->size() == task->node_num); - return true; + } + if (pq.empty() || satisfied_trackers.kth_time() + task->time_limit <= + pq.top()->it->first) { + *start_time = satisfied_trackers.kth_time(); + craned_ids->clear(); + auto it = satisfied_trackers.kth_elem; + while (it != nullptr) { + craned_ids->emplace_back(it->tracker_ptr->craned_id); + it = it->prev; } + CRANE_ASSERT(*start_time != absl::InfiniteFuture()); + CRANE_ASSERT(craned_ids->size() == task->node_num); + return true; } } diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 1a19a08b..fb5c88e7 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -164,7 +164,7 @@ class MinLoadFirst : public INodeSelectionAlgo { TimeAvailResMap::const_iterator it; const TimeAvailResMap::const_iterator end; const ResourceInNode* task_res; - bool satisfied; + void* tracker_list_elem; TimeAvailResTracker(const CranedId& craned_id, const TimeAvailResMap::const_iterator& begin, @@ -174,11 +174,71 @@ class MinLoadFirst : public INodeSelectionAlgo { it(begin), end(end), task_res(task_res), - satisfied(false) {} + tracker_list_elem(nullptr) {} - bool genNext() { - satisfied = *task_res <= it->second; - return ++it != end; + bool satisfied() const { return *task_res <= it->second; } + + bool genNext() { return ++it != end; } + }; + + struct TrackerList { + struct TrackerListElem { + TimeAvailResTracker* tracker_ptr; + absl::Time time; + TrackerListElem* prev; + TrackerListElem* next; + bool first_k; + }; + + size_t size; + int node_num; + TrackerListElem* tail; + TrackerListElem* kth_elem; + + TrackerList(int node_num) + : size(0), node_num(node_num), tail(nullptr), kth_elem(nullptr) {} + + void try_push_back(TimeAvailResTracker* it, absl::Time time) { + if (it->tracker_list_elem) return; + TrackerListElem* elem = + new TrackerListElem{it, time, nullptr, nullptr, ++size <= node_num}; + if (tail) { + elem->prev = tail; + tail->next = elem; + } + if (size == node_num) { + kth_elem = elem; + } + tail = elem; + it->tracker_list_elem = elem; + } + + void try_erase(TimeAvailResTracker* it) { + TrackerListElem* elem = + static_cast(it->tracker_list_elem); + if (!elem) return; + if (elem->first_k && kth_elem) { + kth_elem = kth_elem->next; + if (kth_elem) { + kth_elem->first_k = true; + } + } + if (elem->prev) { + elem->prev->next = elem->next; + } + if (elem->next) { + elem->next->prev = elem->prev; + } + if (elem == tail) { + tail = elem->prev; + } + elem->tracker_ptr->tracker_list_elem = nullptr; + delete elem; + --size; + } + + absl::Time kth_time() const { + return kth_elem ? kth_elem->time : absl::InfiniteFuture(); } }; diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index b5f7e934..7caa53ac 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -158,8 +158,6 @@ using SlotId = std::string; // Model the allocatable resources on a craned node. // It contains CPU and memory by now. -// Delta of resources is used, so it can be negative. -// Using unsigned type do not affect the correctness. struct AllocatableResource { cpu_t cpu_count{0}; From dd60dcbaa994f664ff017b829b91d5dd373797d1 Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Fri, 18 Oct 2024 13:01:47 +0800 Subject: [PATCH 09/10] remove unnecessary headers. --- src/CraneCtld/TaskScheduler.cpp | 5 ----- src/CraneCtld/TaskScheduler.h | 4 ---- 2 files changed, 9 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index e57a8ea2..f30ec900 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -16,11 +16,6 @@ #include "TaskScheduler.h" -#include -#include - -#include - #include "AccountManager.h" #include "CranedKeeper.h" #include "CranedMetaContainer.h" diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index fb5c88e7..bd7ac896 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -16,14 +16,10 @@ #pragma once -#include - #include "CtldPublicDefs.h" // Precompiled header comes first! #include "CranedMetaContainer.h" -#include "DbClient.h" -#include "crane/Lock.h" #include "protos/Crane.pb.h" namespace Ctld { From dd2dba8ac4988187d84f07d228775b6f8b3db2a7 Mon Sep 17 00:00:00 2001 From: NamelessOIer <70872016+NamelessOIer@users.noreply.github.com> Date: Fri, 18 Oct 2024 15:22:53 +0800 Subject: [PATCH 10/10] Refactor --- src/CraneCtld/TaskScheduler.cpp | 26 ++++++++++++++++++++------ src/CraneCtld/TaskScheduler.h | 4 +--- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index f30ec900..ab84e8c0 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -1853,7 +1853,6 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( // Sort all running task in this node by ending time. std::vector> end_time_task_id_vec; - std::vector running_task_ids_str; for (const auto& [task_id, res] : craned_meta->running_task_resource_map) { const auto& task = running_tasks.at(task_id); @@ -1867,8 +1866,14 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( absl::Time end_time = std::max(task->StartTime() + task->time_limit, now + absl::Seconds(1)); end_time_task_id_vec.emplace_back(end_time, task_id); + } - running_task_ids_str.emplace_back(std::to_string(task_id)); + if constexpr (kAlgoTraceOutput) { + std::string running_task_ids_str; + for (const auto& [end_time, task_id] : end_time_task_id_vec) + running_task_ids_str.append(fmt::format("{}, ", task_id)); + CRANE_TRACE("Craned node {} has running tasks: {}", craned_id, + running_task_ids_str); } std::sort( @@ -1888,8 +1893,8 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( } } - // Calculate delta resources at [now, first task end, - // second task end, ...] in this node. + // Calculate how many resources are available at [now, first task end, + // second task end, ...] in this node. auto& time_avail_res_map = node_selection_info_ref.node_time_avail_res_map[craned_id]; node_selection_info_ref.node_res_total_map[craned_id] = @@ -1927,6 +1932,16 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( */ std::tie(cur_time_iter, ok) = time_avail_res_map.emplace(end_time, cur_time_iter->second); + + if constexpr (kAlgoTraceOutput) { + CRANE_TRACE( + "Insert duration [now+{}s, inf) with resource: " + "cpu: {}, mem: {}, gres: {}", + absl::ToInt64Seconds(end_time - now), + craned_meta->res_avail.allocatable_res.cpu_count, + craned_meta->res_avail.allocatable_res.memory_bytes, + util::ReadableDresInNode(craned_meta->res_avail)); + } } /** @@ -2454,8 +2469,7 @@ CraneErr TaskScheduler::AcquireTaskAttributes(TaskInCtld* task) { task_mem_per_cpu = part_meta.default_mem_per_cpu; } else if (part_meta.max_mem_per_cpu != 0) { // If a task sets its memory bytes, - // check if memory/core ratio is greater than the partition's maximum - // value. + // check if memory/core ratio is greater than the partition's maximum value. task_mem_per_cpu = std::min(task_mem_per_cpu, (double)part_meta.max_mem_per_cpu); } diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index bd7ac896..595ba3b7 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -153,7 +153,6 @@ class MinLoadFirst : public INodeSelectionAlgo { * In time interval [y, z-1], the amount of available resources is b. * In time interval [z, ...], the amount of available resources is c. */ - using TimeAvailResMap = std::map; struct TimeAvailResTracker { const CranedId craned_id; @@ -337,8 +336,7 @@ class TaskScheduler { void TerminateTasksOnCraned(const CranedId& craned_id, uint32_t exit_code); - // Temporary inconsistency may happen. If 'false' is returned, just ignore - // it. + // Temporary inconsistency may happen. If 'false' is returned, just ignore it. void QueryTasksInRam(const crane::grpc::QueryTasksInfoRequest* request, crane::grpc::QueryTasksInfoReply* response);