Skip to content

Commit

Permalink
Refactor: Crun support pty
Browse files Browse the repository at this point in the history
Signed-off-by: Li Junlin <[email protected]>
  • Loading branch information
L-Xiafeng committed Nov 14, 2024
1 parent 4265602 commit c1b04b1
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 58 deletions.
9 changes: 5 additions & 4 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,11 @@ message BatchTaskAdditionalMeta {
}

message InteractiveTaskAdditionalMeta{
string cfored_name = 1;
string sh_script = 2;
string term_env = 3;
InteractiveTaskType interactive_type = 4;
InteractiveTaskType interactive_type = 1;
string cfored_name = 2;
string sh_script = 3;
string term_env = 4;
bool pty = 5;
}

message TaskInfo {
Expand Down
60 changes: 35 additions & 25 deletions src/Craned/CforedClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "CforedClient.h"

#include <cerrno>
#include <utility>

#include "crane/String.h"
Expand Down Expand Up @@ -342,12 +343,8 @@ void CforedManager::EvLoopThread_(const std::shared_ptr<uvw::loop>& uvw_loop) {
}

void CforedManager::RegisterIOForward(std::string const& cfored,
task_id_t task_id, int in_fd,
int out_fd) {
RegisterElem elem{.cfored = cfored,
.task_id = task_id,
.in_fd = in_fd,
.out_fd = out_fd};
task_id_t task_id, int fd) {
RegisterElem elem{.cfored = cfored, .task_id = task_id, .fd = fd};
std::promise<bool> done;
std::future<bool> done_fut = done.get_future();

Expand All @@ -371,7 +368,7 @@ void CforedManager::RegisterCb_() {
}

m_cfored_client_map_[elem.cfored]->InitTaskFwdAndSetInputCb(
elem.task_id, [fd = elem.in_fd](const std::string& msg) -> bool {
elem.task_id, [fd = elem.fd](const std::string& msg) -> bool {
ssize_t sz_sent = 0, sz_written;
while (sz_sent != msg.size()) {
sz_written = write(fd, msg.c_str() + sz_sent, msg.size() - sz_sent);
Expand All @@ -385,9 +382,9 @@ void CforedManager::RegisterCb_() {
return true;
});

CRANE_TRACE("Registering fd {} for outputs of task #{}", elem.out_fd,
CRANE_TRACE("Registering fd {} for outputs of task #{}", elem.fd,
elem.task_id);
auto poll_handle = m_loop_->resource<uvw::poll_handle>(elem.out_fd);
auto poll_handle = m_loop_->resource<uvw::poll_handle>(elem.fd);
poll_handle->on<uvw::poll_event>([this, elem = std::move(elem)](
const uvw::poll_event&,
uvw::poll_handle& h) {
Expand All @@ -396,25 +393,38 @@ void CforedManager::RegisterCb_() {
constexpr int MAX_BUF_SIZE = 4096;
char buf[MAX_BUF_SIZE];

auto ret = read(elem.out_fd, buf, MAX_BUF_SIZE);
auto ret = read(elem.fd, buf, MAX_BUF_SIZE);
if (ret == 0) {
CRANE_TRACE("Task #{} to cfored {} finished its output.", elem.task_id,
elem.cfored);
h.close();
close(elem.out_fd);

bool ok_to_free =
m_cfored_client_map_[elem.cfored]->TaskOutputFinish(elem.task_id);
if (ok_to_free) {
CRANE_TRACE("It's ok to unregister task #{} on {}", elem.task_id,
elem.cfored);
UnregisterIOForward_(elem.cfored, elem.task_id);
}
return;
CRANE_ASSERT(false);
}

if (ret == -1)
CRANE_ERROR("Error when reading task #{} output", elem.task_id);
if (ret == -1) {
if (errno == EIO) {
// For pty output, the read() will return -1 with errno set to EIO
// when process exit.
// ref: https://unix.stackexchange.com/questions/538198
CRANE_TRACE("Task #{} to cfored {} finished its output.",
elem.task_id, elem.cfored);
h.close();
close(elem.fd);

bool ok_to_free =
m_cfored_client_map_[elem.cfored]->TaskOutputFinish(elem.task_id);
if (ok_to_free) {
CRANE_TRACE("It's ok to unregister task #{} on {}", elem.task_id,
elem.cfored);
UnregisterIOForward_(elem.cfored, elem.task_id);
}
return;
} else if (errno == EAGAIN) {
// Read before the process begin.
return;
} else {
CRANE_ERROR("Error when reading task #{} output, error {}",
elem.task_id, std::strerror(errno));
return;
}
}

std::string output(buf, ret);
CRANE_TRACE("Fwd to task #{}: {}", elem.task_id, output);
Expand Down
6 changes: 2 additions & 4 deletions src/Craned/CforedClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,14 @@ class CforedManager {

bool Init();

void RegisterIOForward(std::string const& cfored, task_id_t task_id,
int in_fd, int out_fd);
void RegisterIOForward(std::string const& cfored, task_id_t task_id, int fd);
void TaskProcOnCforedStopped(std::string const& cfored, task_id_t task_id);

private:
struct RegisterElem {
std::string cfored;
task_id_t task_id;
int in_fd;
int out_fd;
int fd;
};

struct TaskStopElem {
Expand Down
33 changes: 11 additions & 22 deletions src/Craned/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <fcntl.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/util/delimited_message_util.h>
#include <pty.h>

#include "CforedClient.h"
#include "CgroupManager.h"
Expand Down Expand Up @@ -644,26 +645,6 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance,
return CraneErr::kSystemErr;
}

// Create IO socket pair for crun tasks.
if (instance->IsCrun()) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, io_in_sock_pair) != 0) {
CRANE_ERROR("Failed to create socket pair for task io forward: {}",
strerror(errno));
return CraneErr::kSystemErr;
}

if (socketpair(AF_UNIX, SOCK_STREAM, 0, io_out_sock_pair) != 0) {
CRANE_ERROR("Failed to create socket pair for task io forward: {}",
strerror(errno));
return CraneErr::kSystemErr;
}

auto* crun_meta =
dynamic_cast<CrunMetaInTaskInstance*>(instance->meta.get());
crun_meta->proc_in_fd = io_in_sock_pair[0];
crun_meta->proc_out_fd = io_out_sock_pair[0];
}

// save the current uid/gid
SavedPrivilege saved_priv{getuid(), getgid()};

Expand All @@ -680,7 +661,15 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance,
return CraneErr::kSystemErr;
}

pid_t child_pid = fork();
pid_t child_pid;

if (instance->IsCrun()) {
auto* crun_meta =
dynamic_cast<CrunMetaInTaskInstance*>(instance->meta.get());
child_pid = forkpty(&crun_meta->msg_fd, NULL, NULL, NULL);
} else {
child_pid = fork();
}
if (child_pid == -1) {
CRANE_ERROR("fork() failed for task #{}: {}", instance->task.task_id(),
strerror(errno));
Expand All @@ -696,7 +685,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance,
auto* meta = dynamic_cast<CrunMetaInTaskInstance*>(instance->meta.get());
g_cfored_manager->RegisterIOForward(
instance->task.interactive_meta().cfored_name(),
instance->task.task_id(), meta->proc_in_fd, meta->proc_out_fd);
instance->task.task_id(), meta->msg_fd);
}

int ctrl_fd = ctrl_sock_pair[0];
Expand Down
5 changes: 2 additions & 3 deletions src/Craned/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ struct BatchMetaInTaskInstance : MetaInTaskInstance {
};

struct CrunMetaInTaskInstance : MetaInTaskInstance {
int proc_in_fd;
int proc_out_fd;
int msg_fd;
~CrunMetaInTaskInstance() override = default;
};

Expand All @@ -176,7 +175,7 @@ struct TaskInstance {
}

if (this->IsCrun()) {
close(dynamic_cast<CrunMetaInTaskInstance*>(meta.get())->proc_in_fd);
close(dynamic_cast<CrunMetaInTaskInstance*>(meta.get())->msg_fd);
}
}

Expand Down

0 comments on commit c1b04b1

Please sign in to comment.