Skip to content

Commit

Permalink
Bugfix/sigchld (#316)
Browse files Browse the repository at this point in the history
* refactor sigchild handling

Signed-off-by: xiafeng <[email protected]>

* refactor task status on lunch failed

Signed-off-by: xiafeng <[email protected]>

* refactor.

Signed-off-by: xiafeng <[email protected]>

* typo

Signed-off-by: xiafeng <[email protected]>

* refactor

Signed-off-by: xiafeng <[email protected]>

* typo

Signed-off-by: xiafeng <[email protected]>

* Refactor.

---------

Signed-off-by: xiafeng <[email protected]>
Co-authored-by: RileyWen <[email protected]>
  • Loading branch information
L-Xiafeng and RileyWen authored Sep 26, 2024
1 parent 8a21a72 commit 442c571
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 95 deletions.
2 changes: 2 additions & 0 deletions src/Craned/CranedPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

namespace Craned {

inline const uint64_t kEvSigChldResendMs = 500'000;

using EnvPair = std::pair<std::string, std::string>;

struct TaskStatusChange {
Expand Down
232 changes: 145 additions & 87 deletions src/Craned/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@ TaskManager::TaskManager() {
std::terminate();
}
}
{
m_ev_process_sigchld_ = event_new(m_ev_base_, -1, EV_PERSIST | EV_READ,
EvProcessSigchldCb_, this);
if (!m_ev_process_sigchld_) {
CRANE_ERROR("Failed to create the Do SIGCHLD event!");
std::terminate();
}

if (event_add(m_ev_process_sigchld_, nullptr) < 0) {
CRANE_ERROR("Could not add the Do SIGCHLD event to base!");
std::terminate();
}
}
{ // SIGINT
m_ev_sigint_ = evsignal_new(m_ev_base_, SIGINT, EvSigintCb_, this);
if (!m_ev_sigint_) {
Expand Down Expand Up @@ -267,13 +280,13 @@ void TaskManager::TaskStopAndDoStatusChangeAsync(uint32_t task_id) {

switch (instance->err_before_exec) {
case CraneErr::kProtobufError:
EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Cancelled,
EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Failed,
ExitCode::kExitCodeSpawnProcessFail,
std::nullopt);
break;

case CraneErr::kCgroupError:
EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Cancelled,
EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Failed,
ExitCode::kExitCodeCgroupError, std::nullopt);
break;

Expand Down Expand Up @@ -321,23 +334,29 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events,
assert(m_instance_ptr_->m_instance_ptr_ != nullptr);
auto* this_ = reinterpret_cast<TaskManager*>(user_data);

ProcSigchldInfo sigchld_info{};

int status;
pid_t pid;
while (true) {
pid = waitpid(-1, &status, WNOHANG
/* TODO(More status tracing): | WUNTRACED | WCONTINUED */);

if (pid > 0) {
auto sigchld_info = std::make_unique<ProcSigchldInfo>();

if (WIFEXITED(status)) {
// Exited with status WEXITSTATUS(status)
sigchld_info = {pid, false, WEXITSTATUS(status)};
sigchld_info->pid = pid;
sigchld_info->is_terminated_by_signal = false;
sigchld_info->value = WEXITSTATUS(status);

CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}",
pid, WEXITSTATUS(status));
} else if (WIFSIGNALED(status)) {
// Killed by signal WTERMSIG(status)
sigchld_info = {pid, true, WTERMSIG(status)};
sigchld_info->pid = pid;
sigchld_info->is_terminated_by_signal = true;
sigchld_info->value = WTERMSIG(status);

CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}",
pid, WTERMSIG(status));
}
Expand All @@ -347,61 +366,8 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events,
} else if (WIFCONTINUED(status)) {
printf("continued\n");
} */

this_->m_mtx_.Lock();

auto task_iter = this_->m_pid_task_map_.find(pid);
auto proc_iter = this_->m_pid_proc_map_.find(pid);
if (task_iter == this_->m_pid_task_map_.end() ||
proc_iter == this_->m_pid_proc_map_.end()) {
CRANE_WARN("Failed to find task id for pid {}.", pid);
this_->m_mtx_.Unlock();
} else {
TaskInstance* instance = task_iter->second;
ProcessInstance* proc = proc_iter->second;
uint32_t task_id = instance->task.task_id();

// Remove indexes from pid to ProcessInstance*
this_->m_pid_proc_map_.erase(proc_iter);
this_->m_pid_task_map_.erase(task_iter);

this_->m_mtx_.Unlock();

instance->sigchld_info = sigchld_info;
proc->Finish(sigchld_info.is_terminated_by_signal, sigchld_info.value);

// Free the ProcessInstance. ITask struct is not freed here because
// the ITask for an Interactive task can have no ProcessInstance.
auto pr_it = instance->processes.find(pid);
if (pr_it == instance->processes.end()) {
CRANE_ERROR("Failed to find pid {} in task #{}'s ProcessInstances",
pid, task_id);
} else {
instance->processes.erase(pr_it);

if (!instance->processes.empty()) {
if (sigchld_info.is_terminated_by_signal) {
// If a task is terminated by a signal and there are other
// running processes belonging to this task, kill them.
this_->TerminateTaskAsync(task_id);
}
} else {
if (instance->task.interactive_meta().interactive_type() ==
crane::grpc::Crun)
// TaskStatusChange of a crun task is triggered in
// CforedManager.
g_cfored_manager->TaskProcOnCforedStopped(
instance->task.interactive_meta().cfored_name(),
instance->task.task_id());
else /* Batch / Calloc */ {
// If the ProcessInstance has no process left,
// send TaskStatusChange for this task.
// See the comment of EvActivateTaskStatusChange_.
this_->TaskStopAndDoStatusChangeAsync(task_id);
}
}
}
}
this_->m_sigchld_queue_.enqueue(std::move(sigchld_info));
event_active(this_->m_ev_process_sigchld_, 0, 0);
} else if (pid == 0) {
// There's no child that needs reaping.
// If Craned is exiting, check if there's any task remaining.
Expand All @@ -420,6 +386,101 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events,
}
}

void TaskManager::EvProcessSigchldCb_(int sig, short events, void* user_data) {
auto* this_ = reinterpret_cast<TaskManager*>(user_data);

std::unique_ptr<ProcSigchldInfo> sigchld_info;
while (this_->m_sigchld_queue_.try_dequeue(sigchld_info)) {
auto pid = sigchld_info->pid;

if (sigchld_info->resend_timer != nullptr) {
evtimer_del(sigchld_info->resend_timer);
event_free(sigchld_info->resend_timer);
sigchld_info->resend_timer = nullptr;
}

this_->m_mtx_.Lock();
auto task_iter = this_->m_pid_task_map_.find(pid);
auto proc_iter = this_->m_pid_proc_map_.find(pid);

if (task_iter == this_->m_pid_task_map_.end() ||
proc_iter == this_->m_pid_proc_map_.end()) {
this_->m_mtx_.Unlock();

EvQueueSigchldArg* arg = new EvQueueSigchldArg;

timeval tv{kEvSigChldResendMs / 1000'000, kEvSigChldResendMs % 1000'000};
sigchld_info->resend_timer =
event_new(this_->m_ev_base_, -1, 0, EvOnSigchldTimerCb_, arg);
evtimer_add(sigchld_info->resend_timer, &tv);

CRANE_ASSERT_MSG(sigchld_info->resend_timer != nullptr,
"Failed to create new timer.");

arg->task_manager = this_;
arg->sigchld_info = std::move(sigchld_info);

CRANE_TRACE("Child Process {} exit too early, will do SigchldCb later",
sigchld_info->pid);
continue;
}

TaskInstance* instance = task_iter->second;
ProcessInstance* proc = proc_iter->second;
uint32_t task_id = instance->task.task_id();

// Remove indexes from pid to ProcessInstance*
this_->m_pid_proc_map_.erase(proc_iter);
this_->m_pid_task_map_.erase(task_iter);

this_->m_mtx_.Unlock();

instance->sigchld_info = *sigchld_info;
proc->Finish(sigchld_info->is_terminated_by_signal, sigchld_info->value);

// Free the ProcessInstance. ITask struct is not freed here because
// the ITask for an Interactive task can have no ProcessInstance.
auto pr_it = instance->processes.find(pid);
if (pr_it == instance->processes.end()) {
CRANE_ERROR("Failed to find pid {} in task #{}'s ProcessInstances", pid,
task_id);
} else {
instance->processes.erase(pr_it);

if (!instance->processes.empty()) {
if (sigchld_info->is_terminated_by_signal) {
// If a task is terminated by a signal and there are other
// running processes belonging to this task, kill them.
this_->TerminateTaskAsync(task_id);
}
} else {
if (instance->IsCrun())
// TaskStatusChange of a crun task is triggered in
// CforedManager.
g_cfored_manager->TaskProcOnCforedStopped(
instance->task.interactive_meta().cfored_name(),
instance->task.task_id());
else /* Batch / Calloc */ {
// If the ProcessInstance has no process left,
// send TaskStatusChange for this task.
// See the comment of EvActivateTaskStatusChange_.
this_->TaskStopAndDoStatusChangeAsync(task_id);
}
}
}
}
}

void TaskManager::EvOnSigchldTimerCb_(int, short, void* arg_) {
auto* arg = reinterpret_cast<EvQueueSigchldArg*>(arg_);
auto* this_ = arg->task_manager;

this_->m_sigchld_queue_.enqueue(std::move(arg->sigchld_info));
event_active(this_->m_ev_process_sigchld_, 0, 0);

delete arg;
}

void TaskManager::EvSubprocessReadCb_(struct bufferevent* bev, void* process) {
auto* proc = reinterpret_cast<ProcessInstance*>(process);

Expand Down Expand Up @@ -547,8 +608,8 @@ void TaskManager::SetSigintCallback(std::function<void()> cb) {
m_sigint_cb_ = std::move(cb);
}

CraneErr TaskManager::SpawnProcessInInstance_(
TaskInstance* instance, std::unique_ptr<ProcessInstance> process) {
CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance,
ProcessInstance* process) {
using google::protobuf::io::FileInputStream;
using google::protobuf::io::FileOutputStream;
using google::protobuf::util::ParseDelimitedFromZeroCopyStream;
Expand Down Expand Up @@ -636,25 +697,6 @@ CraneErr TaskManager::SpawnProcessInInstance_(
instance->task.task_id(), meta->proc_in_fd, meta->proc_out_fd);
}

// Note that the following code will move the child process into cgroup.
// Once the child process is moved into cgroup, it might be killed due to
// memory limitation.
// Since the task status change is triggered by SIGCHLD,
// we should put the child pid into the index map IMMEDIATELY when fork() is
// done.
// Otherwise, SIGCHLD handler will not find the pid if the child process
// stops really fast, for example, due to being killed by oom killer.
// In such case, the task status change for this quickly dying job will not
// be triggered, and it will cause infinitely running jobs which are
// actually dead.
m_mtx_.Lock();
m_pid_task_map_.emplace(child_pid, instance);
m_pid_proc_map_.emplace(child_pid, process.get());
m_mtx_.Unlock();

// Move the ownership of ProcessInstance into the TaskInstance.
instance->processes.emplace(child_pid, std::move(process));

int ctrl_fd = ctrl_sock_pair[0];
close(ctrl_sock_pair[1]);
if (instance->IsCrun()) {
Expand Down Expand Up @@ -725,7 +767,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(
// The child process will be reaped in SIGCHLD handler and
// thus only ONE TaskStatusChange will be triggered!
instance->err_before_exec = CraneErr::kProtobufError;
KillProcessInstance_(process.get(), SIGKILL);
KillProcessInstance_(process, SIGKILL);
return CraneErr::kOk;
}

Expand All @@ -742,7 +784,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(

// See comments above.
instance->err_before_exec = CraneErr::kProtobufError;
KillProcessInstance_(process.get(), SIGKILL);
KillProcessInstance_(process, SIGKILL);
return CraneErr::kOk;
}

Expand All @@ -760,7 +802,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(

// See comments above.
instance->err_before_exec = CraneErr::kProtobufError;
KillProcessInstance_(process.get(), SIGKILL);
KillProcessInstance_(process, SIGKILL);
}

// See comments above.
Expand Down Expand Up @@ -1072,14 +1114,30 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) {
// or fork() fails.
// In this case, SIGCHLD will NOT be received for this task, and
// we should send TaskStatusChange manually.
CraneErr err = SpawnProcessInInstance_(instance, std::move(process));
CraneErr err = SpawnProcessInInstance_(instance, process.get());
if (err != CraneErr::kOk) {
EvActivateTaskStatusChange_(
task_id, crane::grpc::TaskStatus::Failed,
ExitCode::kExitCodeSpawnProcessFail,
fmt::format(
"Cannot spawn a new process inside the instance of task #{}",
task_id));
} else {
// kOk means that SpawnProcessInInstance_ has successfully forked a child
// process.
// Now we put the child pid into index maps.
// SIGCHLD sent just after fork() and before putting pid into maps
// will repeatedly be sent by timer and eventually be handled once the
// SIGCHLD processing callback sees the pid in index maps.
m_mtx_.Lock();
m_pid_task_map_.emplace(process->GetPid(), instance);
m_pid_proc_map_.emplace(process->GetPid(), process.get());

// Move the ownership of ProcessInstance into the TaskInstance.
// Make sure existing process can be found when handling SIGCHLD.
instance->processes.emplace(process->GetPid(), std::move(process));

m_mtx_.Unlock();
}
}

Expand Down Expand Up @@ -1221,7 +1279,7 @@ void TaskManager::EvGrpcQueryTaskIdFromPidCb_(int efd, short events,
}
}

void TaskManager::EvOnTimerCb_(int, short, void* arg_) {
void TaskManager::EvOnTaskTimerCb_(int, short, void* arg_) {
auto* arg = reinterpret_cast<EvTimerCbArg*>(arg_);
TaskManager* this_ = arg->task_manager;
task_id_t task_id = arg->task_id;
Expand Down
Loading

0 comments on commit 442c571

Please sign in to comment.