Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/dependency #308

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ enum InteractiveTaskType {
Crun = 1;
}

enum DependencyType {
AFTER = 0;
AFTER_ANY = 1;
AFTER_OK = 2;
AFTER_NOT_OK = 3;
}

message DependencyCondition {
uint32 task_id = 1;
DependencyType type = 2;
}

message Dependencies{
repeated DependencyCondition dependencies = 1;
bool depend_all = 2;
}

message TaskToCtld {
/* -------- Fields that are set at the submission time. ------- */
google.protobuf.Duration time_limit = 1;
Expand All @@ -128,6 +145,8 @@ message TaskToCtld {

bool requeue_if_failed = 12;
bool get_user_env = 13;

Dependencies dependencies = 14;

oneof payload {
BatchTaskAdditionalMeta batch_meta = 21;
Expand Down Expand Up @@ -168,6 +187,11 @@ message RuntimeAttrOfTask {

bool held = 18;
ResourceV2 resources = 19;
bool dependency_ok = 20;
// If this task depends all dependencies, store satisfied dependencies.
// If this task depends any dependency, store unsatisfied dependencies.
// TaskId must be stored in order to restore.
repeated uint32 dependency_ids = 21;
}

message TaskToD {
Expand Down Expand Up @@ -249,6 +273,7 @@ message TaskInfo {
string extra_attr = 20;

// Dynamic task information
uint32 dependency_state = 29;
bool held = 30;
TaskStatus status = 31;

Expand Down
2 changes: 2 additions & 0 deletions src/CraneCtld/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ add_executable(cranectld
CranedMetaContainer.cpp
AccountManager.h
AccountManager.cpp
DependencyManager.h
DependencyManager.cpp
EmbeddedDbClient.cpp
EmbeddedDbClient.h
CraneCtld.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/CraneCtld/CraneCtld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "CtldGrpcServer.h"
#include "CtldPublicDefs.h"
#include "DbClient.h"
#include "DependencyManager.h"
#include "EmbeddedDbClient.h"
#include "TaskScheduler.h"
#include "crane/Logger.h"
Expand Down Expand Up @@ -626,6 +627,7 @@ void DestroyCtldGlobalVariables() {

g_task_scheduler.reset();
g_craned_keeper.reset();
g_dependency_manager.reset();

g_plugin_client.reset();

Expand Down Expand Up @@ -755,6 +757,8 @@ void InitializeCtldGlobalVariables() {
}
}

g_dependency_manager = std::make_unique<DependencyManager>();

g_task_scheduler = std::make_unique<TaskScheduler>();
ok = g_task_scheduler->Init();
if (!ok) {
Expand Down
37 changes: 24 additions & 13 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTask(

auto result = m_ctld_server_->SubmitTaskToScheduler(std::move(task));
if (result.has_value()) {
task_id_t id = result.value().get();
if (id != 0) {
auto submit_result = result.value().get();
if (submit_result.has_value()) {
response->set_ok(true);
task_id_t id = submit_result.value();
response->set_task_id(id);
} else {
response->set_ok(false);
response->set_reason(
"System error occurred or "
"the number of pending tasks exceeded maximum value.");
response->set_reason(std::string(CraneErrStr(submit_result.error())));
}
} else {
response->set_ok(false);
Expand All @@ -58,7 +57,9 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks(
grpc::ServerContext *context,
const crane::grpc::SubmitBatchTasksRequest *request,
crane::grpc::SubmitBatchTasksReply *response) {
std::vector<result::result<std::future<task_id_t>, std::string>> results;
std::vector<result::result<std::future<result::result<task_id_t, CraneErr>>,
std::string>>
results;

uint32_t task_count = request->count();
const auto &task_to_ctld = request->task();
Expand All @@ -73,9 +74,14 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks(
}

for (auto &res : results) {
if (res.has_value())
response->mutable_task_id_list()->Add(res.value().get());
else
if (res.has_value()) {
auto submit_res = res.value().get();
if (submit_res.has_value())
response->mutable_task_id_list()->Add(submit_res.value());
else
response->mutable_reason_list()->Add(
std::string(CraneErrStr(submit_res.error())));
} else
response->mutable_reason_list()->Add(res.error());
}

Expand Down Expand Up @@ -1086,8 +1092,13 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
m_ctld_server_->SubmitTaskToScheduler(std::move(task));
result::result<task_id_t, std::string> result;
if (submit_result.has_value()) {
result = result::result<task_id_t, std::string>{
submit_result.value().get()};
auto submit_final_result = submit_result.value().get();
if (submit_final_result.has_value()) {
result = result::result<task_id_t, std::string>{
submit_final_result.value()};
} else {
result = result::fail(CraneErrStr(submit_final_result.error()));
}
} else {
result = result::fail(submit_result.error());
}
Expand Down Expand Up @@ -1210,7 +1221,7 @@ CtldServer::CtldServer(const Config::CraneCtldListenConf &listen_conf) {
signal(SIGINT, &CtldServer::signal_handler_func);
}

result::result<std::future<task_id_t>, std::string>
result::result<std::future<result::result<task_id_t, CraneErr>>, std::string>
CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
CraneErr err;

Expand Down Expand Up @@ -1260,7 +1271,7 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {

if (err == CraneErr::kOk) {
task->SetSubmitTime(absl::Now());
std::future<task_id_t> future =
std::future<result::result<task_id_t, CraneErr>> future =
g_task_scheduler->SubmitTaskAsync(std::move(task));
return {std::move(future)};
}
Expand Down
27 changes: 17 additions & 10 deletions src/CraneCtld/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ class CforedStreamWriter {
crane::grpc::StreamCforedRequest> *stream)
: m_stream_(stream), m_valid_(true) {}

bool WriteTaskIdReply(
pid_t calloc_pid,
result::result<task_id_t, std::string> res) {
bool WriteTaskIdReply(pid_t calloc_pid,
result::result<task_id_t, std::string> res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand All @@ -64,8 +63,11 @@ class CforedStreamWriter {
return m_stream_->Write(reply);
}

bool WriteTaskResAllocReply(task_id_t task_id,
result::result<std::pair<std::string,std::list<std::string>>, std::string> res) {
bool WriteTaskResAllocReply(
task_id_t task_id,
result::result<std::pair<std::string, std::list<std::string>>,
std::string>
res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand All @@ -76,8 +78,12 @@ class CforedStreamWriter {

if (res.has_value()) {
task_res_alloc_reply->set_ok(true);
task_res_alloc_reply->set_allocated_craned_regex(std::move(res.value().first));
std::ranges::for_each(res.value().second,[&task_res_alloc_reply](const auto& craned_id){task_res_alloc_reply->add_craned_ids(craned_id);});
task_res_alloc_reply->set_allocated_craned_regex(
std::move(res.value().first));
std::ranges::for_each(res.value().second,
[&task_res_alloc_reply](const auto &craned_id) {
task_res_alloc_reply->add_craned_ids(craned_id);
});
} else {
task_res_alloc_reply->set_ok(false);
task_res_alloc_reply->set_failure_reason(std::move(res.error()));
Expand All @@ -89,7 +95,8 @@ class CforedStreamWriter {
bool WriteTaskCompletionAckReply(task_id_t task_id) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;
CRANE_TRACE("Sending TaskCompletionAckReply to cfored of task id {}",task_id);
CRANE_TRACE("Sending TaskCompletionAckReply to cfored of task id {}",
task_id);
StreamCtldReply reply;
reply.set_type(StreamCtldReply::TASK_COMPLETION_ACK_REPLY);

Expand Down Expand Up @@ -271,8 +278,8 @@ class CtldServer {

inline void Wait() { m_server_->Wait(); }

result::result<std::future<task_id_t>, std::string> SubmitTaskToScheduler(
std::unique_ptr<TaskInCtld> task);
result::result<std::future<result::result<task_id_t, CraneErr>>, std::string>
SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task);

private:
template <typename K, typename V,
Expand Down
25 changes: 25 additions & 0 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ struct TaskInCtld {
bool requeue_if_failed{false};
bool get_user_env{false};

crane::grpc::Dependencies dependencies;

std::string cmd_line;
std::unordered_map<std::string, std::string> env;
std::string cwd;
Expand Down Expand Up @@ -308,6 +310,8 @@ struct TaskInCtld {
crane::grpc::TaskStatus status;
uint32_t exit_code;
bool held{false};
bool dependency_ok{false};
std::vector<task_id_t> dependency_ids;

// If this task is PENDING, start_time is either not set (default constructed)
// or an estimated start time.
Expand Down Expand Up @@ -458,6 +462,21 @@ struct TaskInCtld {
resources = std::move(val);
}
ResourceV2 const& Resources() const { return resources; }

void SetDependencyOK() {
dependency_ok = true;
runtime_attr.set_dependency_ok(true);
}
bool HasDependency() const {
return dependencies.dependencies_size() != 0 && !dependency_ok;
}
void DependencyAdd(const std::vector<task_id_t>& val) {
dependency_ids.insert(dependency_ids.end(), val.begin(), val.end());
for (auto const& id : val) runtime_attr.add_dependency_ids(id);
}
bool NoWaitingDependency() const {
return dependency_ids.size() == dependencies.dependencies_size();
}

void SetFieldsByTaskToCtld(crane::grpc::TaskToCtld const& val) {
task_to_ctld = val;
Expand Down Expand Up @@ -506,6 +525,8 @@ struct TaskInCtld {
get_user_env = val.get_user_env();

extra_attr = val.extra_attr();

dependencies = val.dependencies();
}

void SetFieldsByRuntimeAttr(crane::grpc::RuntimeAttrOfTask const& val) {
Expand All @@ -522,6 +543,10 @@ struct TaskInCtld {
status = runtime_attr.status();
held = runtime_attr.held();

dependency_ok = runtime_attr.dependency_ok();
dependency_ids.assign(runtime_attr.dependency_ids().begin(),
runtime_attr.dependency_ids().end());

if (status != crane::grpc::TaskStatus::Pending) {
craned_ids.assign(runtime_attr.craned_ids().begin(),
runtime_attr.craned_ids().end());
Expand Down
Loading