Skip to content

Commit

Permalink
feat: dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
NamelessOIer committed Aug 24, 2024
1 parent 1440e75 commit 9c2353e
Show file tree
Hide file tree
Showing 11 changed files with 929 additions and 191 deletions.
25 changes: 25 additions & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ enum InteractiveTaskType {
Crun = 1;
}

enum DependencyType {
AFTER = 0;
AFTER_ANY = 1;
AFTER_OK = 2;
AFTER_NOT_OK = 3;
}

message DependencyCondition {
uint32 task_id = 1;
DependencyType type = 2;
}

message Dependencies{
repeated DependencyCondition dependencies = 1;
bool depend_all = 2;
}

message TaskToCtld {
/* -------- Fields that are set at the submission time. ------- */
google.protobuf.Duration time_limit = 1;
Expand All @@ -128,6 +145,8 @@ message TaskToCtld {

bool requeue_if_failed = 12;
bool get_user_env = 13;

Dependencies dependencies = 14;

oneof payload {
BatchTaskAdditionalMeta batch_meta = 21;
Expand Down Expand Up @@ -168,6 +187,11 @@ message RuntimeAttrOfTask {

bool held = 18;
ResourceV2 resources = 19;
bool dependency_ok = 20;
// If this task depends all dependencies, store satisfied dependencies.
// If this task depends any dependency, store unsatisfied dependencies.
// TaskId must be stored in order to restore.
repeated uint32 dependency_ids = 21;
}

message TaskToD {
Expand Down Expand Up @@ -249,6 +273,7 @@ message TaskInfo {
string extra_attr = 20;

// Dynamic task information
uint32 dependency_state = 29;
bool held = 30;
TaskStatus status = 31;

Expand Down
2 changes: 2 additions & 0 deletions src/CraneCtld/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ add_executable(cranectld
CranedMetaContainer.cpp
AccountManager.h
AccountManager.cpp
DependencyManager.h
DependencyManager.cpp
EmbeddedDbClient.cpp
EmbeddedDbClient.h
CraneCtld.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/CraneCtld/CraneCtld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "CtldGrpcServer.h"
#include "CtldPublicDefs.h"
#include "DbClient.h"
#include "DependencyManager.h"
#include "EmbeddedDbClient.h"
#include "TaskScheduler.h"
#include "crane/Logger.h"
Expand Down Expand Up @@ -626,6 +627,7 @@ void DestroyCtldGlobalVariables() {

g_task_scheduler.reset();
g_craned_keeper.reset();
g_dependency_manager.reset();

g_plugin_client.reset();

Expand Down Expand Up @@ -755,6 +757,8 @@ void InitializeCtldGlobalVariables() {
}
}

g_dependency_manager = std::make_unique<DependencyManager>();

g_task_scheduler = std::make_unique<TaskScheduler>();
ok = g_task_scheduler->Init();
if (!ok) {
Expand Down
37 changes: 24 additions & 13 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTask(

auto result = m_ctld_server_->SubmitTaskToScheduler(std::move(task));
if (result.has_value()) {
task_id_t id = result.value().get();
if (id != 0) {
auto submit_result = result.value().get();
if (submit_result.has_value()) {
response->set_ok(true);
task_id_t id = submit_result.value();
response->set_task_id(id);
} else {
response->set_ok(false);
response->set_reason(
"System error occurred or "
"the number of pending tasks exceeded maximum value.");
response->set_reason(std::string(CraneErrStr(submit_result.error())));
}
} else {
response->set_ok(false);
Expand All @@ -58,7 +57,9 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks(
grpc::ServerContext *context,
const crane::grpc::SubmitBatchTasksRequest *request,
crane::grpc::SubmitBatchTasksReply *response) {
std::vector<result::result<std::future<task_id_t>, std::string>> results;
std::vector<result::result<std::future<result::result<task_id_t, CraneErr>>,
std::string>>
results;

uint32_t task_count = request->count();
const auto &task_to_ctld = request->task();
Expand All @@ -73,9 +74,14 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks(
}

for (auto &res : results) {
if (res.has_value())
response->mutable_task_id_list()->Add(res.value().get());
else
if (res.has_value()) {
auto submit_res = res.value().get();
if (submit_res.has_value())
response->mutable_task_id_list()->Add(submit_res.value());
else
response->mutable_reason_list()->Add(
std::string(CraneErrStr(submit_res.error())));
} else
response->mutable_reason_list()->Add(res.error());
}

Expand Down Expand Up @@ -1086,8 +1092,13 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
m_ctld_server_->SubmitTaskToScheduler(std::move(task));
result::result<task_id_t, std::string> result;
if (submit_result.has_value()) {
result = result::result<task_id_t, std::string>{
submit_result.value().get()};
auto submit_final_result = submit_result.value().get();
if (submit_final_result.has_value()) {
result = result::result<task_id_t, std::string>{
submit_final_result.value()};
} else {
result = result::fail(CraneErrStr(submit_final_result.error()));
}
} else {
result = result::fail(submit_result.error());
}
Expand Down Expand Up @@ -1210,7 +1221,7 @@ CtldServer::CtldServer(const Config::CraneCtldListenConf &listen_conf) {
signal(SIGINT, &CtldServer::signal_handler_func);
}

result::result<std::future<task_id_t>, std::string>
result::result<std::future<result::result<task_id_t, CraneErr>>, std::string>
CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
CraneErr err;

Expand Down Expand Up @@ -1260,7 +1271,7 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {

if (err == CraneErr::kOk) {
task->SetSubmitTime(absl::Now());
std::future<task_id_t> future =
std::future<result::result<task_id_t, CraneErr>> future =
g_task_scheduler->SubmitTaskAsync(std::move(task));
return {std::move(future)};
}
Expand Down
27 changes: 17 additions & 10 deletions src/CraneCtld/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ class CforedStreamWriter {
crane::grpc::StreamCforedRequest> *stream)
: m_stream_(stream), m_valid_(true) {}

bool WriteTaskIdReply(
pid_t calloc_pid,
result::result<task_id_t, std::string> res) {
bool WriteTaskIdReply(pid_t calloc_pid,
result::result<task_id_t, std::string> res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand All @@ -64,8 +63,11 @@ class CforedStreamWriter {
return m_stream_->Write(reply);
}

bool WriteTaskResAllocReply(task_id_t task_id,
result::result<std::pair<std::string,std::list<std::string>>, std::string> res) {
bool WriteTaskResAllocReply(
task_id_t task_id,
result::result<std::pair<std::string, std::list<std::string>>,
std::string>
res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand All @@ -76,8 +78,12 @@ class CforedStreamWriter {

if (res.has_value()) {
task_res_alloc_reply->set_ok(true);
task_res_alloc_reply->set_allocated_craned_regex(std::move(res.value().first));
std::ranges::for_each(res.value().second,[&task_res_alloc_reply](const auto& craned_id){task_res_alloc_reply->add_craned_ids(craned_id);});
task_res_alloc_reply->set_allocated_craned_regex(
std::move(res.value().first));
std::ranges::for_each(res.value().second,
[&task_res_alloc_reply](const auto &craned_id) {
task_res_alloc_reply->add_craned_ids(craned_id);
});
} else {
task_res_alloc_reply->set_ok(false);
task_res_alloc_reply->set_failure_reason(std::move(res.error()));
Expand All @@ -89,7 +95,8 @@ class CforedStreamWriter {
bool WriteTaskCompletionAckReply(task_id_t task_id) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;
CRANE_TRACE("Sending TaskCompletionAckReply to cfored of task id {}",task_id);
CRANE_TRACE("Sending TaskCompletionAckReply to cfored of task id {}",
task_id);
StreamCtldReply reply;
reply.set_type(StreamCtldReply::TASK_COMPLETION_ACK_REPLY);

Expand Down Expand Up @@ -271,8 +278,8 @@ class CtldServer {

inline void Wait() { m_server_->Wait(); }

result::result<std::future<task_id_t>, std::string> SubmitTaskToScheduler(
std::unique_ptr<TaskInCtld> task);
result::result<std::future<result::result<task_id_t, CraneErr>>, std::string>
SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task);

private:
template <typename K, typename V,
Expand Down
25 changes: 25 additions & 0 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ struct TaskInCtld {
bool requeue_if_failed{false};
bool get_user_env{false};

crane::grpc::Dependencies dependencies;

std::string cmd_line;
std::unordered_map<std::string, std::string> env;
std::string cwd;
Expand Down Expand Up @@ -308,6 +310,8 @@ struct TaskInCtld {
crane::grpc::TaskStatus status;
uint32_t exit_code;
bool held{false};
bool dependency_ok{false};
std::vector<task_id_t> dependency_ids;

// If this task is PENDING, start_time is either not set (default constructed)
// or an estimated start time.
Expand Down Expand Up @@ -458,6 +462,21 @@ struct TaskInCtld {
resources = std::move(val);
}
ResourceV2 const& Resources() const { return resources; }

void SetDependencyOK() {
dependency_ok = true;
runtime_attr.set_dependency_ok(true);
}
bool HasDependency() const {
return dependencies.dependencies_size() != 0 && !dependency_ok;
}
void DependencyAdd(const std::vector<task_id_t>& val) {
dependency_ids.insert(dependency_ids.end(), val.begin(), val.end());
for (auto const& id : val) runtime_attr.add_dependency_ids(id);
}
bool NoWaitingDependency() const {
return dependency_ids.size() == dependencies.dependencies_size();
}

void SetFieldsByTaskToCtld(crane::grpc::TaskToCtld const& val) {
task_to_ctld = val;
Expand Down Expand Up @@ -506,6 +525,8 @@ struct TaskInCtld {
get_user_env = val.get_user_env();

extra_attr = val.extra_attr();

dependencies = val.dependencies();
}

void SetFieldsByRuntimeAttr(crane::grpc::RuntimeAttrOfTask const& val) {
Expand All @@ -522,6 +543,10 @@ struct TaskInCtld {
status = runtime_attr.status();
held = runtime_attr.held();

dependency_ok = runtime_attr.dependency_ok();
dependency_ids.assign(runtime_attr.dependency_ids().begin(),
runtime_attr.dependency_ids().end());

if (status != crane::grpc::TaskStatus::Pending) {
craned_ids.assign(runtime_attr.craned_ids().begin(),
runtime_attr.craned_ids().end());
Expand Down
Loading

0 comments on commit 9c2353e

Please sign in to comment.