diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index c7fdc0fbe..e36659173 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -109,6 +109,23 @@ enum InteractiveTaskType { Crun = 1; } +enum DependencyType { + AFTER = 0; + AFTER_ANY = 1; + AFTER_OK = 2; + AFTER_NOT_OK = 3; +} + +message DependencyCondition { + uint32 task_id = 1; + DependencyType type = 2; +} + +message Dependencies{ + repeated DependencyCondition dependencies = 1; + bool depend_all = 2; +} + message TaskToCtld { /* -------- Fields that are set at the submission time. ------- */ google.protobuf.Duration time_limit = 1; @@ -128,6 +145,8 @@ message TaskToCtld { bool requeue_if_failed = 12; bool get_user_env = 13; + + Dependencies dependencies = 14; oneof payload { BatchTaskAdditionalMeta batch_meta = 21; @@ -169,6 +188,11 @@ message RuntimeAttrOfTask { bool held = 18; ResourceV2 resources = 19; + bool dependency_ok = 20; + // If this task depends all dependencies, store satisfied dependencies. + // If this task depends any dependency, store unsatisfied dependencies. + // TaskId must be stored in order to restore. + repeated uint32 dependency_ids = 21; } message TaskToD { @@ -247,6 +271,7 @@ message TaskInfo { ResourceView res_view = 17; // Dynamic task information + uint32 dependency_state = 29; bool held = 30; TaskStatus status = 31; diff --git a/src/CraneCtld/CMakeLists.txt b/src/CraneCtld/CMakeLists.txt index 190766a2a..e085f5c38 100644 --- a/src/CraneCtld/CMakeLists.txt +++ b/src/CraneCtld/CMakeLists.txt @@ -12,6 +12,8 @@ add_executable(cranectld CranedMetaContainer.cpp AccountManager.h AccountManager.cpp + DependencyManager.h + DependencyManager.cpp EmbeddedDbClient.cpp EmbeddedDbClient.h CraneCtld.cpp diff --git a/src/CraneCtld/CraneCtld.cpp b/src/CraneCtld/CraneCtld.cpp index bfb30ee73..e21285ae3 100644 --- a/src/CraneCtld/CraneCtld.cpp +++ b/src/CraneCtld/CraneCtld.cpp @@ -29,6 +29,7 @@ #include "CranedMetaContainer.h" #include "CtldGrpcServer.h" #include "DbClient.h" +#include "DependencyManager.h" #include "EmbeddedDbClient.h" #include "TaskScheduler.h" #include "crane/Network.h" @@ -604,6 +605,7 @@ void DestroyCtldGlobalVariables() { g_task_scheduler.reset(); g_craned_keeper.reset(); + g_dependency_manager.reset(); // In case that spdlog is destructed before g_embedded_db_client->Close() // in which log function is called. @@ -725,6 +727,8 @@ void InitializeCtldGlobalVariables() { } } + g_dependency_manager = std::make_unique(); + g_task_scheduler = std::make_unique(); ok = g_task_scheduler->Init(); if (!ok) { diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index 39c5e101f..fbcf23c93 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -36,15 +36,14 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTask( auto result = m_ctld_server_->SubmitTaskToScheduler(std::move(task)); if (result.has_value()) { - task_id_t id = result.value().get(); - if (id != 0) { + auto submit_result = result.value().get(); + if (submit_result.has_value()) { response->set_ok(true); + task_id_t id = submit_result.value(); response->set_task_id(id); } else { response->set_ok(false); - response->set_reason( - "System error occurred or " - "the number of pending tasks exceeded maximum value."); + response->set_reason(std::string(CraneErrStr(submit_result.error()))); } } else { response->set_ok(false); @@ -58,7 +57,9 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks( grpc::ServerContext *context, const crane::grpc::SubmitBatchTasksRequest *request, crane::grpc::SubmitBatchTasksReply *response) { - std::vector, std::string>> results; + std::vector>, + std::string>> + results; uint32_t task_count = request->count(); const auto &task_to_ctld = request->task(); @@ -73,9 +74,14 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks( } for (auto &res : results) { - if (res.has_value()) - response->mutable_task_id_list()->Add(res.value().get()); - else + if (res.has_value()) { + auto submit_res = res.value().get(); + if (submit_res.has_value()) + response->mutable_task_id_list()->Add(submit_res.value()); + else + response->mutable_reason_list()->Add( + std::string(CraneErrStr(submit_res.error()))); + } else response->mutable_reason_list()->Add(res.error()); } @@ -1086,8 +1092,13 @@ grpc::Status CraneCtldServiceImpl::CforedStream( m_ctld_server_->SubmitTaskToScheduler(std::move(task)); result::result result; if (submit_result.has_value()) { - result = result::result{ - submit_result.value().get()}; + auto submit_final_result = submit_result.value().get(); + if (submit_final_result.has_value()) { + result = result::result{ + submit_final_result.value()}; + } else { + result = result::fail(CraneErrStr(submit_final_result.error())); + } } else { result = result::fail(submit_result.error()); } @@ -1210,7 +1221,7 @@ CtldServer::CtldServer(const Config::CraneCtldListenConf &listen_conf) { signal(SIGINT, &CtldServer::signal_handler_func); } -result::result, std::string> +result::result>, std::string> CtldServer::SubmitTaskToScheduler(std::unique_ptr task) { CraneErr err; @@ -1260,7 +1271,7 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr task) { if (err == CraneErr::kOk) { task->SetSubmitTime(absl::Now()); - std::future future = + std::future> future = g_task_scheduler->SubmitTaskAsync(std::move(task)); return {std::move(future)}; } diff --git a/src/CraneCtld/CtldGrpcServer.h b/src/CraneCtld/CtldGrpcServer.h index 1dba3de04..e50926fb3 100644 --- a/src/CraneCtld/CtldGrpcServer.h +++ b/src/CraneCtld/CtldGrpcServer.h @@ -42,9 +42,8 @@ class CforedStreamWriter { crane::grpc::StreamCforedRequest> *stream) : m_stream_(stream), m_valid_(true) {} - bool WriteTaskIdReply( - pid_t calloc_pid, - result::result res) { + bool WriteTaskIdReply(pid_t calloc_pid, + result::result res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; @@ -64,8 +63,11 @@ class CforedStreamWriter { return m_stream_->Write(reply); } - bool WriteTaskResAllocReply(task_id_t task_id, - result::result>, std::string> res) { + bool WriteTaskResAllocReply( + task_id_t task_id, + result::result>, + std::string> + res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; @@ -76,8 +78,12 @@ class CforedStreamWriter { if (res.has_value()) { task_res_alloc_reply->set_ok(true); - task_res_alloc_reply->set_allocated_craned_regex(std::move(res.value().first)); - std::ranges::for_each(res.value().second,[&task_res_alloc_reply](const auto& craned_id){task_res_alloc_reply->add_craned_ids(craned_id);}); + task_res_alloc_reply->set_allocated_craned_regex( + std::move(res.value().first)); + std::ranges::for_each(res.value().second, + [&task_res_alloc_reply](const auto &craned_id) { + task_res_alloc_reply->add_craned_ids(craned_id); + }); } else { task_res_alloc_reply->set_ok(false); task_res_alloc_reply->set_failure_reason(std::move(res.error())); @@ -89,7 +95,8 @@ class CforedStreamWriter { bool WriteTaskCompletionAckReply(task_id_t task_id) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; - CRANE_TRACE("Sending TaskCompletionAckReply to cfored of task id {}",task_id); + CRANE_TRACE("Sending TaskCompletionAckReply to cfored of task id {}", + task_id); StreamCtldReply reply; reply.set_type(StreamCtldReply::TASK_COMPLETION_ACK_REPLY); @@ -271,8 +278,8 @@ class CtldServer { inline void Wait() { m_server_->Wait(); } - result::result, std::string> SubmitTaskToScheduler( - std::unique_ptr task); + result::result>, std::string> + SubmitTaskToScheduler(std::unique_ptr task); private: template env; std::string cwd; @@ -296,6 +298,8 @@ struct TaskInCtld { crane::grpc::TaskStatus status; uint32_t exit_code; bool held{false}; + bool dependency_ok{false}; + std::vector dependency_ids; // If this task is PENDING, start_time is either not set (default constructed) // or an estimated start time. @@ -446,6 +450,21 @@ struct TaskInCtld { resources = std::move(val); } ResourceV2 const& Resources() const { return resources; } + + void SetDependencyOK() { + dependency_ok = true; + runtime_attr.set_dependency_ok(true); + } + bool HasDependency() const { + return dependencies.dependencies_size() != 0 && !dependency_ok; + } + void DependencyAdd(const std::vector& val) { + dependency_ids.insert(dependency_ids.end(), val.begin(), val.end()); + for (auto const& id : val) runtime_attr.add_dependency_ids(id); + } + bool NoWaitingDependency() const { + return dependency_ids.size() == dependencies.dependencies_size(); + } void SetFieldsByTaskToCtld(crane::grpc::TaskToCtld const& val) { task_to_ctld = val; @@ -492,6 +511,8 @@ struct TaskInCtld { qos = val.qos(); get_user_env = val.get_user_env(); + + dependencies = val.dependencies(); } void SetFieldsByRuntimeAttr(crane::grpc::RuntimeAttrOfTask const& val) { @@ -507,6 +528,10 @@ struct TaskInCtld { status = runtime_attr.status(); held = runtime_attr.held(); + dependency_ok = runtime_attr.dependency_ok(); + dependency_ids.assign(runtime_attr.dependency_ids().begin(), + runtime_attr.dependency_ids().end()); + if (status != crane::grpc::TaskStatus::Pending) { craned_ids.assign(runtime_attr.craned_ids().begin(), runtime_attr.craned_ids().end()); diff --git a/src/CraneCtld/DependencyManager.cpp b/src/CraneCtld/DependencyManager.cpp new file mode 100644 index 000000000..5f5e27377 --- /dev/null +++ b/src/CraneCtld/DependencyManager.cpp @@ -0,0 +1,231 @@ +/** + * Copyright (c) 2023 Peking University and Peking University + * Changsha Institute for Computing and Digital Economy + * + * CraneSched is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of + * the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, + * WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +#include "DependencyManager.h" + +#include +#include + +#include "EmbeddedDbClient.h" +#include "crane/Logger.h" +#include "crane/PublicHeader.h" +#include "protos/PublicDefs.pb.h" + +namespace Ctld { + +DependencyManager::DependencyManager() {} + +result::result DependencyManager::addDependencies( + task_id_t task_id, crane::grpc::Dependencies& dependencies) { + std::shared_ptr self = std::make_shared(); + self->task_id = task_id; + self->depend_all = dependencies.depend_all(); + + std::vector, crane::grpc::DependencyType>> + dependent_infos; + { + LockGuard global_lock(&g_dependency_mutex); + for (const auto& dep : dependencies.dependencies()) { + task_id_t dep_id = dep.task_id(); + auto it = g_all_task_info.find(dep_id); + if (it == g_all_task_info.end()) { + return result::fail(fmt::format( + "Dependency task #{} does not exist or has ended", dep_id)); + } + dependent_infos.push_back({it->second, dep.type()}); + } + + if (g_all_task_info.find(task_id) != g_all_task_info.end()) { + return result::fail(fmt::format("Task #{} already exists", task_id)); + } + g_all_task_info[task_id] = self; + } + + int succeeded = 0; + int failed = 0; + + crane::grpc::Dependencies new_dependencies; + new_dependencies.set_depend_all(dependencies.depend_all()); + for (int i = 0; i < dependent_infos.size(); ++i) { + auto& [dep_info, dep_type] = dependent_infos[i]; + LockGuard lock(&dep_info->mutex); + if (dep_info->listStatus[dep_type] == 0) { + dep_info->Dependents[dep_type].push_back(self); + new_dependencies.add_dependencies()->CopyFrom( + dependencies.dependencies(i)); + } else { + dep_info->listStatus[dep_type] == Info::SUCCEED ? ++succeeded : ++failed; + } + } + + if (failed > 0 && + (dependencies.depend_all() || failed >= dependent_infos.size())) { + // current task has no dependent before the function returns + // just try to remove the task from the global map + // then no dependent of current task will be added + LockGuard lock(&g_dependency_mutex); + g_all_task_info.erase(task_id); + return result::fail("Dependencies already failed"); + } else if (succeeded > 0 && (!dependencies.depend_all() || + succeeded >= dependent_infos.size())) { + new_dependencies.mutable_dependencies()->Clear(); + } + dependencies = new_dependencies; + return {}; +} + +bool DependencyManager::updateDependencies( + task_id_t task_id, crane::grpc::TaskStatus new_status, int exit_code, + std::unordered_map>* dependencies, + std::vector* success_tasks, + std::vector* failed_tasks) { + if (new_status == crane::grpc::TaskStatus::Pending) return {}; + + std::shared_ptr self; + { + LockGuard global_lock(&g_dependency_mutex); + auto it = g_all_task_info.find(task_id); + if (it == g_all_task_info.end()) { + return false; + } + self = it->second; + if (new_status != crane::grpc::TaskStatus::Running) { + // remove task from the global map to avoid adding dependents + g_all_task_info.erase(it); + } + } + + if (new_status == crane::grpc::TaskStatus::Running) { + clearList(self, crane::grpc::AFTER, Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + } else if (new_status == crane::grpc::TaskStatus::Cancelled) { + clearList(self, crane::grpc::AFTER, Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + clearList(self, crane::grpc::AFTER_ANY, Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + clearList(self, crane::grpc::AFTER_OK, Info::FAILED, dependencies, + success_tasks, failed_tasks); + clearList(self, crane::grpc::AFTER_NOT_OK, Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + } else if (new_status == crane::grpc::TaskStatus::Completed) { + clearList(self, crane::grpc::AFTER_ANY, Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + clearList(self, crane::grpc::AFTER_OK, + exit_code == 0 ? Info::SUCCEED : Info::FAILED, dependencies, + success_tasks, failed_tasks); + clearList(self, crane::grpc::AFTER_NOT_OK, + exit_code == 0 ? Info::FAILED : Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + } else if (new_status == crane::grpc::TaskStatus::Failed || + new_status == crane::grpc::TaskStatus::ExceedTimeLimit) { + clearList(self, crane::grpc::AFTER, Info::FAILED, dependencies, + success_tasks, failed_tasks); + clearList(self, crane::grpc::AFTER_ANY, Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + clearList(self, crane::grpc::AFTER_OK, Info::FAILED, dependencies, + success_tasks, failed_tasks); + clearList(self, crane::grpc::AFTER_NOT_OK, Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + } else { + CRANE_ERROR("Unknown task status: {}", static_cast(new_status)); + } + + return true; +} + +void DependencyManager::clearList( + const std::shared_ptr& info, crane::grpc::DependencyType type, + Info::ListStatus status, + std::unordered_map>* dependencies, + std::vector* success_tasks, + std::vector* failed_tasks) { + std::vector> to_update; + { + LockGuard lock(&info->mutex); + if (info->listStatus[type] != 0) return; + info->listStatus[type] = status; + std::swap(to_update, info->Dependents[type]); + } + for (auto& dep_info : to_update) { + if (dep_info->depend_all && status == Info::FAILED) { + failed_tasks->push_back(dep_info->task_id); + clearList(dep_info, crane::grpc::AFTER, Info::FAILED, dependencies, + success_tasks, failed_tasks); + clearList(dep_info, crane::grpc::AFTER_ANY, Info::SUCCEED, dependencies, + success_tasks, failed_tasks); + clearList(dep_info, crane::grpc::AFTER_OK, Info::FAILED, dependencies, + success_tasks, failed_tasks); + clearList(dep_info, crane::grpc::AFTER_NOT_OK, Info::SUCCEED, + dependencies, success_tasks, failed_tasks); + } else if (!dep_info->depend_all && status == Info::SUCCEED) { + success_tasks->push_back(dep_info->task_id); + } else { + (*dependencies)[dep_info->task_id].push_back(info->task_id); + } + } +} + +void DependencyManager::RecoverFromSnapshot( + std::unordered_map + pending_queue) { + for (const auto& [task_db_id, task] : pending_queue) { + const auto& runtime_attr = task.runtime_attr(); + const auto& task_to_ctld = task.task_to_ctld(); + task_id_t task_id = runtime_attr.task_id(); + const auto& submitted_dependencies = task_to_ctld.dependencies(); + const auto& updated_dependencies = runtime_attr.dependency_ids(); + if (g_all_task_info.find(task_id) == g_all_task_info.end()) { + g_all_task_info[task_id] = std::make_shared(); + g_all_task_info[task_id]->task_id = task_id; + } + g_all_task_info[task_id]->depend_all = submitted_dependencies.depend_all(); + if (submitted_dependencies.dependencies_size() == 0 || + runtime_attr.dependency_ok()) + continue; + std::unordered_map + rest_dependencies; + for (auto dep_info : submitted_dependencies.dependencies()) { + rest_dependencies.emplace(dep_info.task_id(), dep_info.type()); + } + for (auto dep_id : updated_dependencies) { + if (!rest_dependencies.erase(dep_id)) { + CRANE_ERROR("dependency #{} of Task#{} not exist.", dep_id, task_id); + continue; + } + } + if (rest_dependencies.empty()) { + CRANE_ERROR("Task #{} has no dependency rested but dependency unmet.", + task_id); + continue; + } + for (const auto& [dep_id, dep_type] : rest_dependencies) { + if (g_all_task_info.find(dep_id) == g_all_task_info.end()) { + g_all_task_info[dep_id] = std::make_shared(); + g_all_task_info[dep_id]->task_id = dep_id; + } + auto dep_info = g_all_task_info[dep_id]; + // If task status change has been written to db, then all changes to + // dependency are also written. So recovered dependencies can be added + // without check. + // While the list state still need to be set to avoid invalid new + // dependencies. this step is done in updateDependencies called by + // TaskScheduler::Init. + dep_info->Dependents[dep_type].push_back(g_all_task_info[task_id]); + } + } +} + +} // namespace Ctld \ No newline at end of file diff --git a/src/CraneCtld/DependencyManager.h b/src/CraneCtld/DependencyManager.h new file mode 100644 index 000000000..139f451a3 --- /dev/null +++ b/src/CraneCtld/DependencyManager.h @@ -0,0 +1,87 @@ +/** + * Copyright (c) 2023 Peking University and Peking University + * Changsha Institute for Computing and Digital Economy + * + * CraneSched is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of + * the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, + * WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +#pragma once + +#include + +#include "CtldPublicDefs.h" +#include "crane/PublicHeader.h" +#include "protos/PublicDefs.pb.h" + +namespace Ctld { + +class DependencyManager { + using Mutex = absl::Mutex; + using LockGuard = absl::MutexLock; + + template > + using HashMap = absl::flat_hash_map; + + struct Info { + enum ListStatus { + WAITING = 0, + SUCCEED = 1, + FAILED = 2, + }; + + task_id_t task_id; + bool depend_all; + + Mutex mutex; + std::vector> + Dependents[crane::grpc::DependencyType_ARRAYSIZE]; + ListStatus listStatus[crane::grpc::DependencyType_ARRAYSIZE]; + }; + + public: + DependencyManager(); + ~DependencyManager() = default; + + result::result addDependencies( + task_id_t task_id, crane::grpc::Dependencies& dependencies); + + // Update dependencies, return the list of meet dependencies(to remove from + // TaskInCtld), success tasks(to clear all dependencies), failed tasks(to + // change TaskStatus to FAILED) + bool updateDependencies( + task_id_t task_id, crane::grpc::TaskStatus new_status, int exit_code, + std::unordered_map>* dependencies, + std::vector* success_tasks, + std::vector* failed_tasks); + + // Will only be called by TaskScheduler::Init, lock is not needed + void RecoverFromSnapshot( + std::unordered_map + pending_queue); + + private: + void clearList( + const std::shared_ptr& info, crane::grpc::DependencyType type, + Info::ListStatus status, + std::unordered_map>* dependencies, + std::vector* success_tasks, + std::vector* failed_tasks); + + HashMap> g_all_task_info + GUARDED_BY(g_dependency_mutex); + Mutex g_dependency_mutex; +}; + +} // namespace Ctld + +inline std::unique_ptr g_dependency_manager; \ No newline at end of file diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 8670b401b..4a977dc00 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -23,6 +23,7 @@ #include "AccountManager.h" #include "CranedKeeper.h" #include "CranedMetaContainer.h" +#include "DependencyManager.h" #include "EmbeddedDbClient.h" namespace Ctld { @@ -63,8 +64,143 @@ bool TaskScheduler::Init() { return false; } + // Build pending_queue ahead to convient for finding task by task_id and + // insert failed task to db. + std::unordered_map> pending_queue; + if (!snapshot.pending_queue.empty()) { + g_dependency_manager->RecoverFromSnapshot(snapshot.pending_queue); + CRANE_INFO("{} pending task(s) recovered.", snapshot.pending_queue.size()); + + for (auto&& [task_db_id, task_in_embedded_db] : snapshot.pending_queue) { + auto task = std::make_unique(); + task->SetFieldsByTaskToCtld(task_in_embedded_db.task_to_ctld()); + // Must be called after SetFieldsByTaskToCtld! + task->SetFieldsByRuntimeAttr(task_in_embedded_db.runtime_attr()); + + task_id_t task_id = task->TaskId(); + + CRANE_TRACE("Restore task #{} from embedded pending queue.", + task->TaskId()); + + pending_queue.emplace(task_id, std::move(task)); + } + } + auto& running_queue = snapshot.running_queue; + // After update task status, calling this function to update dependency and + // both embedded_db and mongo_db. Task failed due to dependency will be + // removed from pending queue. + auto update_task_to_db = [&](const std::unique_ptr& task) { + std::unordered_map> dependencies; + std::vector dep_succ_task_vec; + std::vector dep_fail_task_vec; + bool ok = g_dependency_manager->updateDependencies( + task->TaskId(), task->Status(), 0, &dependencies, &dep_succ_task_vec, + &dep_fail_task_vec); + + std::vector dep_updated_task_raw_ptr_vec; + std::vector> dep_failed_task_ptr_vec; + for (auto& task_id : dep_fail_task_vec) { + auto it = pending_queue.find(task_id); + if (it != pending_queue.end()) { + CRANE_INFO( + "Failed to requeue task #{} due to dependency. Mark it as FAILED " + "and move it to the ended queue.", + task_id); + it->second->SetStatus(crane::grpc::Failed); + dep_failed_task_ptr_vec.emplace_back(std::move(it->second)); + pending_queue.erase(it); + } + } + for (auto& task_id : dep_succ_task_vec) { + auto it = pending_queue.find(task_id); + if (it != pending_queue.end()) { + it->second->SetDependencyOK(); + dep_updated_task_raw_ptr_vec.emplace_back(it->second.get()); + } + } + for (auto& [task_id, dep_ids] : dependencies) { + auto it = pending_queue.find(task_id); + if (it != pending_queue.end() && it->second->HasDependency()) { + it->second->DependencyAdd(dep_ids); + it->second->DependencyAdd(dep_ids); + if (it->second->NoWaitingDependency()) { + if (it->second->dependencies.depend_all()) { + it->second->SetDependencyOK(); + } else { + CRANE_INFO( + "Failed to requeue task #{} due to dependency. Mark it as " + "FAILED " + "and move it to the ended queue.", + task_id); + it->second->SetStatus(crane::grpc::Failed); + dep_failed_task_ptr_vec.emplace_back(std::move(it->second)); + pending_queue.erase(it); + continue; + } + } + dep_updated_task_raw_ptr_vec.emplace_back(it->second.get()); + } + } + + txn_id_t txn_id; + + ok = g_embedded_db_client->BeginVariableDbTransaction(&txn_id); + if (!ok) { + CRANE_ERROR( + "TaskScheduler failed to start transaction when mark the task #{} as " + "FAILED.", + task->TaskId()); + } else { + ok = g_embedded_db_client->UpdateRuntimeAttrOfTask( + txn_id, task->TaskDbId(), task->RuntimeAttr()); + for (auto& task : dep_updated_task_raw_ptr_vec) { + ok &= g_embedded_db_client->UpdateRuntimeAttrOfTask( + txn_id, task->TaskDbId(), task->RuntimeAttr()); + } + for (auto& task : dep_failed_task_ptr_vec) { + ok &= g_embedded_db_client->UpdateRuntimeAttrOfTask( + txn_id, task->TaskDbId(), task->RuntimeAttr()); + } + ok &= g_embedded_db_client->CommitVariableDbTransaction(txn_id); + + if (!ok) { + CRANE_ERROR( + "UpdateRuntimeAttrOfTask failed for task #{} when " + "mark the task as FAILED.", + task->TaskId()); + } + } + + std::vector ended_tasks; + for (const auto& task : dep_failed_task_ptr_vec) + ended_tasks.emplace_back(task.get()); + if (task->Status() == crane::grpc::Completed || + task->Status() == crane::grpc::Failed || + task->Status() == crane::grpc::ExceedTimeLimit || + task->Status() == crane::grpc::Cancelled) { + ended_tasks.emplace_back(task.get()); + } + + if (!ended_tasks.empty()) { + if (!g_db_client->InsertJobs(ended_tasks)) { + CRANE_ERROR("Failed to call g_db_client->InsertJobs() "); + return; + } + + std::vector db_ids; + for (TaskInCtld* task : ended_tasks) + db_ids.emplace_back(task->TaskDbId()); + + if (!g_embedded_db_client->PurgeEndedTasks(db_ids)) { + CRANE_ERROR( + "Failed to call g_embedded_db_client->PurgeEndedTasks() " + "for final tasks"); + } + } + }; + if (!running_queue.empty()) { CRANE_INFO("{} running task(s) recovered.", running_queue.size()); @@ -80,17 +216,11 @@ bool TaskScheduler::Init() { CRANE_TRACE("Restore task #{} from embedded running queue.", task->TaskId()); + std::vector> ended_task_ptr_vec; err = AcquireTaskAttributes(task.get()); if (err != CraneErr::kOk || task->type == crane::grpc::Interactive) { task->SetStatus(crane::grpc::Failed); - ok = g_embedded_db_client->UpdateRuntimeAttrOfTask(0, task_db_id, - task->RuntimeAttr()); - if (!ok) { - CRANE_ERROR( - "UpdateRuntimeAttrOfTask failed for task #{} when " - "mark the task as FAILED.", - task_id); - } + update_task_to_db(task); if (err != CraneErr::kOk) CRANE_INFO( "Failed to acquire task attributes for restored running task " @@ -110,23 +240,6 @@ bool TaskScheduler::Init() { craned_cgroups_map[craned_id].emplace_back(task->TaskId(), task->uid); } - - ok = g_db_client->InsertJob(task.get()); - if (!ok) { - CRANE_ERROR( - "InsertJob failed for task #{} " - "when recovering running queue.", - task->TaskId()); - } - - std::vector db_ids{task_db_id}; - ok = g_embedded_db_client->PurgeEndedTasks(db_ids); - if (!ok) { - CRANE_ERROR( - "PurgeEndedTasks failed for task #{} when recovering " - "running queue.", - task->TaskId()); - } } // Move this problematic task into ended queue and @@ -152,14 +265,7 @@ bool TaskScheduler::Init() { task->allocated_craneds_regex.clear(); task->CranedIdsClear(); - ok = g_embedded_db_client->UpdateRuntimeAttrOfTask(0, task->TaskDbId(), - task->RuntimeAttr()); - if (!ok) { - CRANE_ERROR( - "Failed to call " - "g_embedded_db_client->UpdateRuntimeAttrOfTask()"); - } - + update_task_to_db(task); // Now the task is moved to the embedded pending queue. RequeueRecoveredTaskIntoPendingQueueLock_(std::move(task)); } else { @@ -167,13 +273,7 @@ bool TaskScheduler::Init() { err = stub->CheckTaskStatus(task->TaskId(), &status); if (err == CraneErr::kOk) { task->SetStatus(status); - ok = g_embedded_db_client->UpdateRuntimeAttrOfTask( - 0, task->TaskDbId(), task->RuntimeAttr()); - if (!ok) { - CRANE_ERROR( - "Failed to call " - "g_embedded_db_client->UpdateRuntimeAttrOfTask()"); - } + update_task_to_db(task); if (status == crane::grpc::Running) { // Exec node is up and the task is running. // Just allocate resource from allocated nodes and @@ -212,23 +312,6 @@ bool TaskScheduler::Init() { craned_cgroups_map[craned_id].emplace_back(task->TaskId(), task->uid); } - - ok = g_db_client->InsertJob(task.get()); - if (!ok) { - CRANE_ERROR( - "InsertJob failed for task #{} " - "when recovering running queue.", - task->TaskId()); - } - - std::vector db_ids{task_db_id}; - ok = g_embedded_db_client->PurgeEndedTasks(db_ids); - if (!ok) { - CRANE_ERROR( - "PurgeEndedTasks failed for task #{} when recovering " - "running queue.", - task->TaskId()); - } } } else { // Exec node is up but task id does not exist. @@ -259,13 +342,7 @@ bool TaskScheduler::Init() { task->allocated_craneds_regex.clear(); task->CranedIdsClear(); - ok = g_embedded_db_client->UpdateRuntimeAttrOfTask( - 0, task->TaskDbId(), task->RuntimeAttr()); - if (!ok) { - CRANE_ERROR( - "Failed to call " - "g_embedded_db_client->UpdateRuntimeAttrOfTask()"); - } + update_task_to_db(task); // Now the task is moved to the embedded pending queue. RequeueRecoveredTaskIntoPendingQueueLock_(std::move(task)); } @@ -292,21 +369,10 @@ bool TaskScheduler::Init() { } // Process the pending tasks in the embedded pending queue. - auto& pending_queue = snapshot.pending_queue; if (!pending_queue.empty()) { - CRANE_INFO("{} pending task(s) recovered.", pending_queue.size()); - - for (auto&& [task_db_id, task_in_embedded_db] : pending_queue) { - auto task = std::make_unique(); - task->SetFieldsByTaskToCtld(task_in_embedded_db.task_to_ctld()); - // Must be called after SetFieldsByTaskToCtld! - task->SetFieldsByRuntimeAttr(task_in_embedded_db.runtime_attr()); - - task_id_t task_id = task->TaskId(); - - CRANE_TRACE("Restore task #{} from embedded pending queue.", - task->TaskId()); - + for (auto it = pending_queue.begin(); it != pending_queue.end();) { + auto& task_id = it->first; + auto& task = it->second; bool mark_task_as_failed = false; if (task->type != crane::grpc::Batch) { @@ -326,44 +392,26 @@ bool TaskScheduler::Init() { mark_task_as_failed = true; } - if (!mark_task_as_failed) { - RequeueRecoveredTaskIntoPendingQueueLock_(std::move(task)); - } else { - // If a batch task failed to requeue the task into pending queue due to - // insufficient resource or other reasons or the task is an interactive - // task, Mark it as FAILED and move it to the ended queue. + if (mark_task_as_failed) { + // If a batch task failed to requeue the task into pending queue due + // to insufficient resource or other reasons or the task is an + // interactive task, Mark it as FAILED and move it to the ended queue. CRANE_INFO( "Failed to requeue task #{}. Mark it as FAILED and " "move it to the ended queue.", task_id); task->SetStatus(crane::grpc::Failed); - ok = g_embedded_db_client->UpdateRuntimeAttrOfTask(0, task_db_id, - task->RuntimeAttr()); - if (!ok) { - CRANE_ERROR( - "UpdateRuntimeAttrOfTask failed for task #{} when " - "mark the task as FAILED.", - task_id); - } - - ok = g_db_client->InsertJob(task.get()); - if (!ok) { - CRANE_ERROR( - "InsertJob failed for task #{} when recovering pending " - "queue.", - task->TaskId()); - } - - std::vector db_ids{task_db_id}; - ok = g_embedded_db_client->PurgeEndedTasks(db_ids); - if (!ok) { - CRANE_ERROR( - "PurgeEndedTasks failed for task #{} when recovering " - "pending queue.", - task->TaskId()); - } + std::vector> ended_task_ptr_vec; + update_task_to_db(task); + it = pending_queue.erase(it); + } else { + it = std::next(it); } } + + for (auto&& [task_id, task] : pending_queue) { + RequeueRecoveredTaskIntoPendingQueueLock_(std::move(task)); + } } if (!snapshot.final_queue.empty()) { @@ -834,43 +882,90 @@ void TaskScheduler::ScheduleThread_() { CranedStub::NewExecuteTasksRequests(craned_id, tasks_raw_ptrs); } - // Move tasks into running queue. - txn_id_t txn_id{0}; - bool ok = g_embedded_db_client->BeginVariableDbTransaction(&txn_id); - if (!ok) { - CRANE_ERROR( - "TaskScheduler failed to start transaction when scheduling."); - } + { + // Update dependencies due to the execution of tasks. + // updated_task_raw_ptrs will not contain duplicated tasks. + std::vector updated_task_raw_ptrs; + updated_task_raw_ptrs.reserve(selection_result_list.size()); + std::unordered_map> dependencies; + std::vector dep_succ_task_vec; - for (auto& it : selection_result_list) { - auto& task = it.first; + for (auto& it : selection_result_list) { + auto& task = it.first; + bool ok = g_dependency_manager->updateDependencies( + task->TaskId(), crane::grpc::TaskStatus::Running, 0, + &dependencies, &dep_succ_task_vec, + nullptr); // start a task never fails any dependency + if (!ok) { + CRANE_ERROR("Failed to update dependencies for task #{}.", + task->TaskId()); + } + updated_task_raw_ptrs.emplace_back(task.get()); + } - // IMPORTANT: task must be put into running_task_map before any - // time-consuming operation, otherwise TaskStatusChange RPC will come - // earlier before task is put into running_task_map. - g_embedded_db_client->UpdateRuntimeAttrOfTask(txn_id, task->TaskDbId(), - task->RuntimeAttr()); - } + if (!(dependencies.empty() && dep_succ_task_vec.empty())) { + LockGuard pending_guard(&m_pending_task_map_mtx_); + for (auto& task_id : dep_succ_task_vec) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end()) { + it->second->SetDependencyOK(); + updated_task_raw_ptrs.emplace_back(it->second.get()); + } + } + for (auto& [task_id, dep_ids] : dependencies) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end() && + it->second->HasDependency()) { + it->second->DependencyAdd(dep_ids); + if (it->second->NoWaitingDependency()) { + if (it->second->dependencies.depend_all()) { + it->second->SetDependencyOK(); + } else { + CRANE_ERROR("Task #{} Failed when starting other tasks.", + task_id); + } + } + updated_task_raw_ptrs.emplace_back(it->second.get()); + } + } + } - ok = g_embedded_db_client->CommitVariableDbTransaction(txn_id); - if (!ok) { - CRANE_ERROR("Embedded database failed to commit manual transaction."); - } + // Move tasks into running queue. + txn_id_t txn_id{0}; + bool ok = g_embedded_db_client->BeginVariableDbTransaction(&txn_id); + if (!ok) { + CRANE_ERROR( + "TaskScheduler failed to start transaction when scheduling."); + } - // Set succeed tasks status and do callbacks. - for (auto& it : selection_result_list) { - auto& task = it.first; - if (task->type == crane::grpc::Interactive) { - const auto& meta = std::get(task->meta); - std::get(task->meta) - .cb_task_res_allocated(task->TaskId(), - task->allocated_craneds_regex, - task->CranedIds()); + for (auto& task : updated_task_raw_ptrs) { + // IMPORTANT: task must be put into running_task_map before any + // time-consuming operation, otherwise TaskStatusChange RPC will + // come earlier before task is put into running_task_map. + g_embedded_db_client->UpdateRuntimeAttrOfTask( + txn_id, task->TaskDbId(), task->RuntimeAttr()); } - m_running_task_map_mtx_.Lock(); - m_running_task_map_.emplace(task->TaskId(), std::move(task)); - m_running_task_map_mtx_.Unlock(); + ok = g_embedded_db_client->CommitVariableDbTransaction(txn_id); + if (!ok) { + CRANE_ERROR("Embedded database failed to commit manual transaction."); + } + + // Set succeed tasks status and do callbacks. + for (auto& it : selection_result_list) { + auto& task = it.first; + if (task->type == crane::grpc::Interactive) { + const auto& meta = std::get(task->meta); + std::get(task->meta) + .cb_task_res_allocated(task->TaskId(), + task->allocated_craneds_regex, + task->CranedIds()); + } + + m_running_task_map_mtx_.Lock(); + m_running_task_map_.emplace(task->TaskId(), std::move(task)); + m_running_task_map_mtx_.Unlock(); + } } end = std::chrono::steady_clock::now(); @@ -963,6 +1058,12 @@ void TaskScheduler::ScheduleThread_() { // Move failed tasks to the completed queue. std::vector failed_task_raw_ptrs; + std::unordered_map> dependencies; + std::vector dep_succ_task_vec; + std::vector dep_fail_task_vec; + std::vector updated_task_raw_ptrs; + std::vector> failed_tasks; + for (auto& it : failed_result_list) { auto& task = it.first; failed_task_raw_ptrs.emplace_back(task.get()); @@ -970,13 +1071,78 @@ void TaskScheduler::ScheduleThread_() { task->SetStatus(crane::grpc::Failed); task->SetExitCode(ExitCode::kExitCodeCgroupError); task->SetEndTime(absl::Now()); + bool ok = g_dependency_manager->updateDependencies( + task->TaskId(), crane::grpc::TaskStatus::Failed, 0, &dependencies, + &dep_succ_task_vec, &dep_fail_task_vec); + if (!ok) { + CRANE_ERROR("Failed to update dependencies for task #{}.", + task->TaskId()); + } } + + if (!(dependencies.empty() && dep_succ_task_vec.empty() && + dep_fail_task_vec.empty())) { + LockGuard pending_guard(&m_pending_task_map_mtx_); + for (auto& task_id : dep_fail_task_vec) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end()) { + auto& task = it->second; + task->SetStatus(crane::grpc::Failed); + if (task->type != crane::grpc::Batch) { + auto& meta = std::get(task->meta); + g_thread_pool->detach_task( + [cb = meta.cb_task_cancel, task_id = task_id] { + cb(task_id); + }); + } + failed_task_raw_ptrs.emplace_back(task.get()); + failed_tasks.emplace_back(std::move(task)); + m_pending_task_map_.erase(it); + } + } + for (auto& task_id : dep_succ_task_vec) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end()) { + it->second->SetDependencyOK(); + updated_task_raw_ptrs.emplace_back(it->second.get()); + } + } + for (auto& [task_id, dep_ids] : dependencies) { + auto it = m_pending_task_map_.find(task_id); + auto& task = it->second; + if (it != m_pending_task_map_.end() && task->HasDependency()) { + task->DependencyAdd(dep_ids); + if (task->NoWaitingDependency()) { + if (task->dependencies.depend_all()) { + task->SetDependencyOK(); + } else { + task->SetStatus(crane::grpc::Failed); + if (task->type != crane::grpc::Batch) { + auto& meta = std::get(task->meta); + g_thread_pool->detach_task( + [cb = meta.cb_task_cancel, task_id = task_id] { + cb(task_id); + }); + } + failed_task_raw_ptrs.emplace_back(task.get()); + failed_tasks.emplace_back(std::move(task)); + m_pending_task_map_.erase(it); + continue; + } + } + updated_task_raw_ptrs.emplace_back(task.get()); + } + } + } + // TODO: Add MovePendingToFinal // TODO: Add crun callback here! - PersistAndTransferTasksToMongodb_(failed_task_raw_ptrs); + PersistAndTransferTasksToMongodb_(failed_task_raw_ptrs, + updated_task_raw_ptrs); // Failed tasks have been handled properly. Free them explicitly. failed_result_list.clear(); + failed_tasks.clear(); end = std::chrono::steady_clock::now(); CRANE_TRACE( @@ -1000,10 +1166,11 @@ void TaskScheduler::SetNodeSelectionAlgo( m_node_selection_algo_ = std::move(algo); } -std::future TaskScheduler::SubmitTaskAsync( +std::future> TaskScheduler::SubmitTaskAsync( std::unique_ptr task) { - std::promise promise; - std::future future = promise.get_future(); + std::promise> promise; + std::future> future = + promise.get_future(); m_submit_task_queue_.enqueue({std::move(task), std::move(promise)}); m_submit_task_async_handle_->send(); @@ -1146,8 +1313,8 @@ crane::grpc::CancelTaskReply TaskScheduler::CancelPendingOrRunningTask( uint32_t operator_uid = request.operator_uid(); - // When an ordinary user tries to cancel jobs, they are automatically filtered - // to their own jobs. + // When an ordinary user tries to cancel jobs, they are automatically + // filtered to their own jobs. std::string filter_uname = request.filter_username(); if (filter_uname.empty() && g_account_manager->CheckUidIsAdmin(operator_uid).has_error()) { @@ -1441,7 +1608,72 @@ void TaskScheduler::CleanCancelQueueCb_() { } } - PersistAndTransferTasksToMongodb_(task_raw_ptr_vec); + std::unordered_map> dependencies; + std::vector dep_succ_task_vec; + std::vector dep_fail_task_vec; + for (auto& task : task_ptr_vec) { + bool ok = g_dependency_manager->updateDependencies( + task->TaskId(), crane::grpc::TaskStatus::Cancelled, 0, &dependencies, + &dep_succ_task_vec, &dep_fail_task_vec); + if (!ok) { + CRANE_ERROR("Failed to update dependencies for task #{}.", + task->TaskId()); + } + } + std::vector updated_raw_ptr_vec; + + if (!(dependencies.empty() && dep_succ_task_vec.empty() && + dep_fail_task_vec.empty())) { + LockGuard pending_guard(&m_pending_task_map_mtx_); + for (auto& task_id : dep_fail_task_vec) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end()) { + auto& task = it->second; + task->SetStatus(crane::grpc::Failed); + if (task->type != crane::grpc::Batch) { + auto& meta = std::get(task->meta); + g_thread_pool->detach_task( + [cb = meta.cb_task_cancel, task_id = task_id] { cb(task_id); }); + } + task_raw_ptr_vec.emplace_back(task.get()); + task_ptr_vec.emplace_back(std::move(task)); + m_pending_task_map_.erase(it); + } + } + for (auto& task_id : dep_succ_task_vec) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end()) { + it->second->SetDependencyOK(); + updated_raw_ptr_vec.emplace_back(it->second.get()); + } + } + for (auto& [task_id, dep_ids] : dependencies) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end() && it->second->HasDependency()) { + auto& task = it->second; + task->DependencyAdd(dep_ids); + if (task->NoWaitingDependency()) { + if (task->dependencies.depend_all()) { + task->SetDependencyOK(); + } else { + task->SetStatus(crane::grpc::Failed); + if (task->type != crane::grpc::Batch) { + auto& meta = std::get(task->meta); + g_thread_pool->detach_task([cb = meta.cb_task_cancel, + task_id = task_id] { cb(task_id); }); + } + task_raw_ptr_vec.emplace_back(task.get()); + task_ptr_vec.emplace_back(std::move(task)); + m_pending_task_map_.erase(it); + continue; + } + } + updated_raw_ptr_vec.emplace_back(task.get()); + } + } + } + + PersistAndTransferTasksToMongodb_(task_raw_ptr_vec, updated_raw_ptr_vec); } void TaskScheduler::SubmitTaskTimerCb_() { @@ -1455,7 +1687,8 @@ void TaskScheduler::SubmitTaskAsyncCb_() { void TaskScheduler::CleanSubmitQueueCb_() { using SubmitQueueElem = - std::pair, std::promise>; + std::pair, + std::promise>>; // It's ok to use an approximate size. size_t approximate_size = m_submit_task_queue_.size_approx(); @@ -1503,10 +1736,13 @@ void TaskScheduler::CleanSubmitQueueCb_() { if (!g_embedded_db_client->AppendTasksToPendingAndAdvanceTaskIds( accepted_task_ptrs)) { CRANE_ERROR("Failed to append a batch of tasks to embedded db queue."); - for (auto& pair : accepted_tasks) pair.second /*promise*/.set_value(0); + for (auto& pair : accepted_tasks) + pair.second /*promise*/.set_value( + result::fail(CraneErr::kEmbeddedDbError)); break; } + std::vector failed_task_db_ids; m_pending_task_map_mtx_.Lock(); for (uint32_t i = 0; i < accepted_tasks.size(); i++) { @@ -1514,6 +1750,13 @@ void TaskScheduler::CleanSubmitQueueCb_() { task_id_t id = accepted_tasks[pos].first->TaskId(); auto& task_id_promise = accepted_tasks[pos].second; + auto result = g_dependency_manager->addDependencies( + id, accepted_tasks[pos].first->dependencies); + if (result.has_error()) { + task_id_promise.set_value(result::fail(CraneErr::kDependencyError)); + failed_task_db_ids.push_back(accepted_tasks[pos].first->TaskDbId()); + continue; + } m_pending_task_map_.emplace(id, std::move(accepted_tasks[pos].first)); task_id_promise.set_value(id); } @@ -1521,6 +1764,8 @@ void TaskScheduler::CleanSubmitQueueCb_() { m_pending_map_cached_size_.store(m_pending_task_map_.size(), std::memory_order_release); m_pending_task_map_mtx_.Unlock(); + + g_embedded_db_client->PurgeEndedTasks(failed_task_db_ids); } while (false); // Reject tasks beyond queue capacity @@ -1534,7 +1779,8 @@ void TaskScheduler::CleanSubmitQueueCb_() { CRANE_TRACE("Rejecting {} tasks...", rejected_actual_size); for (size_t i = 0; i < rejected_actual_size; i++) - rejected_tasks[i].second.set_value(0); + rejected_tasks[i].second.set_value( + result::fail(CraneErr::kPendingQueueLimit)); } while (false); } @@ -1577,10 +1823,12 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() { std::unordered_map>> craned_cgroups_map; - LockGuard running_guard(&m_running_task_map_mtx_); - LockGuard indexes_guard(&m_task_indexes_mtx_); + auto now = absl::Now(); - for (const auto& [task_id, exit_code, new_status, craned_index] : args) { + m_running_task_map_mtx_.Lock(); + m_task_indexes_mtx_.Lock(); + + for (auto& [task_id, exit_code, new_status, craned_index] : args) { auto iter = m_running_task_map_.find(task_id); if (iter == m_running_task_map_.end()) { CRANE_WARN( @@ -1626,7 +1874,15 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() { } task->SetExitCode(exit_code); - task->SetEndTime(absl::Now()); + task->SetEndTime(now); + + if (new_status == crane::grpc::Completed && + task->type != crane::grpc::Batch) { + auto& meta = std::get(task->meta); + if (meta.interactive_type == crane::grpc::Calloc) { + exit_code = 0; + } + } for (CranedId const& craned_id : task->CranedIds()) { craned_cgroups_map[craned_id].emplace_back(task_id, task->uid); @@ -1659,6 +1915,9 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() { m_running_task_map_.erase(iter); } + m_running_task_map_mtx_.Unlock(); + m_task_indexes_mtx_.Unlock(); + absl::BlockingCounter bl(craned_cgroups_map.size()); for (const auto& [craned_id, cgroups] : craned_cgroups_map) { g_thread_pool->detach_task([&bl, &craned_id, &cgroups]() { @@ -1677,7 +1936,72 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() { } bl.Wait(); - PersistAndTransferTasksToMongodb_(task_raw_ptr_vec); + std::unordered_map> dependencies; + std::vector dep_succ_task_vec; + std::vector dep_fail_task_vec; + for (const auto& [task_id, exit_code, new_status, craned_index] : args) { + bool ok = g_dependency_manager->updateDependencies( + task_id, new_status, exit_code, &dependencies, &dep_succ_task_vec, + &dep_fail_task_vec); + if (!ok) { + CRANE_ERROR("Failed to update dependencies for task #{}.", task_id); + } + } + + std::vector updated_raw_ptr_vec; + + if (!(dependencies.empty() && dep_succ_task_vec.empty() && + dep_fail_task_vec.empty())) { + LockGuard pending_guard(&m_pending_task_map_mtx_); + for (auto& task_id : dep_fail_task_vec) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end()) { + auto& task = it->second; + task->SetStatus(crane::grpc::Failed); + if (task->type != crane::grpc::Batch) { + auto& meta = std::get(task->meta); + g_thread_pool->detach_task( + [cb = meta.cb_task_cancel, task_id = task_id] { cb(task_id); }); + } + task_raw_ptr_vec.emplace_back(task.get()); + task_ptr_vec.emplace_back(std::move(task)); + m_pending_task_map_.erase(it); + } + } + for (auto& task_id : dep_succ_task_vec) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end()) { + it->second->SetDependencyOK(); + updated_raw_ptr_vec.emplace_back(it->second.get()); + } + } + for (auto& [task_id, dep_ids] : dependencies) { + auto it = m_pending_task_map_.find(task_id); + if (it != m_pending_task_map_.end() && it->second->HasDependency()) { + auto& task = it->second; + task->DependencyAdd(dep_ids); + if (task->NoWaitingDependency()) { + if (task->dependencies.depend_all()) { + task->SetDependencyOK(); + } else { + task->SetStatus(crane::grpc::Failed); + if (task->type != crane::grpc::Batch) { + auto& meta = std::get(task->meta); + g_thread_pool->detach_task([cb = meta.cb_task_cancel, + task_id = task_id] { cb(task_id); }); + } + task_raw_ptr_vec.emplace_back(task.get()); + task_ptr_vec.emplace_back(std::move(task)); + m_pending_task_map_.erase(it); + continue; + } + } + updated_raw_ptr_vec.emplace_back(task.get()); + } + } + } + + PersistAndTransferTasksToMongodb_(task_raw_ptr_vec, updated_raw_ptr_vec); } void TaskScheduler::QueryTasksInRam( @@ -2587,12 +2911,19 @@ void MinLoadFirst::SubtractTaskResourceNodeSelectionInfo_( } void TaskScheduler::PersistAndTransferTasksToMongodb_( - std::vector const& tasks) { - if (tasks.empty()) return; + std::vector const& ended_tasks, + std::vector const& updated_tasks) { + if (ended_tasks.empty()) return; txn_id_t txn_id; g_embedded_db_client->BeginVariableDbTransaction(&txn_id); - for (TaskInCtld* task : tasks) { + for (TaskInCtld* task : ended_tasks) { + if (!g_embedded_db_client->UpdateRuntimeAttrOfTask(txn_id, task->TaskDbId(), + task->RuntimeAttr())) + CRANE_ERROR("Failed to call UpdateRuntimeAttrOfTask() for task #{}", + task->TaskId()); + } + for (TaskInCtld* task : updated_tasks) { if (!g_embedded_db_client->UpdateRuntimeAttrOfTask(txn_id, task->TaskDbId(), task->RuntimeAttr())) CRANE_ERROR("Failed to call UpdateRuntimeAttrOfTask() for task #{}", @@ -2602,14 +2933,14 @@ void TaskScheduler::PersistAndTransferTasksToMongodb_( g_embedded_db_client->CommitVariableDbTransaction(txn_id); // Now tasks are in MongoDB. - if (!g_db_client->InsertJobs(tasks)) { + if (!g_db_client->InsertJobs(ended_tasks)) { CRANE_ERROR("Failed to call g_db_client->InsertJobs() "); return; } // Remove tasks in final queue. std::vector db_ids; - for (TaskInCtld* task : tasks) db_ids.emplace_back(task->TaskDbId()); + for (TaskInCtld* task : ended_tasks) db_ids.emplace_back(task->TaskDbId()); if (!g_embedded_db_client->PurgeEndedTasks(db_ids)) { CRANE_ERROR( @@ -2776,6 +3107,10 @@ std::vector MultiFactorPriority::GetOrderedTaskIdList( task->pending_reason = "Held"; continue; } + if (task->HasDependency()) { + task->pending_reason = "Dependency"; + continue; + } // Admin may manually specify the priority of a task. // In this case, MultiFactorPriority will not calculate the priority. double priority = (task->mandated_priority == 0.0) diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 6bb2df619..309ed6d13 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -222,7 +222,8 @@ class TaskScheduler { /// \return The future is set to 0 if task submission is failed. /// Otherwise, it is set to newly allocated task id. - std::future SubmitTaskAsync(std::unique_ptr task); + std::future> SubmitTaskAsync( + std::unique_ptr task); std::future HoldReleaseTaskAsync(task_id_t task_id, int64_t secs); @@ -289,7 +290,8 @@ class TaskScheduler { void PutRecoveredTaskIntoRunningQueueLock_(std::unique_ptr task); static void PersistAndTransferTasksToMongodb_( - std::vector const& tasks); + std::vector const& tasks, + std::vector const& updated_tasks); CraneErr TerminateRunningTaskNoLock_(TaskInCtld* task); @@ -366,8 +368,8 @@ class TaskScheduler { void SubmitTaskTimerCb_(); std::shared_ptr m_submit_task_async_handle_; - ConcurrentQueue< - std::pair, std::promise>> + ConcurrentQueue, + std::promise>>> m_submit_task_queue_; void SubmitTaskAsyncCb_(); diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index 78c0b0be5..4382f7db5 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -57,6 +57,10 @@ enum class CraneErr : uint16_t { kLibEventError, kNoAvailNode, + kDependencyError, + kEmbeddedDbError, + kPendingQueueLimit, + __ERR_SIZE // NOLINT(bugprone-reserved-identifier) }; @@ -139,6 +143,10 @@ constexpr std::array "Error when using protobuf", "Error when using LibEvent", "Not enough nodes which satisfy resource requirements", + + "Dependency task does not exist or has ended", + "Error when append task to embedded db queue", + "Too much pending tasks", }; }